use core::{cmp::Ordering, iter, mem, ops::Range, time::Duration};
use bevy::{ecs::change_detection::Tick, prelude::*};
use log::trace;
use postcard::experimental::{max_size::MaxSize, serialized_size};
use super::{entity_ranges::EntityRanges, serialized_data::SerializedData};
use crate::{
postcard_utils,
prelude::*,
server::ClientPools,
shared::{
backend::channels::ServerChannel,
replication::{
client_ticks::{ClientTicks, MutateInfo},
mutate_index::MutateIndex,
registry::component_mask::ComponentMask,
},
},
};
#[derive(Component, Default)]
pub(crate) struct Mutations {
related: Vec<Vec<EntityMutations>>,
standalone: Vec<EntityMutations>,
entity_location: Option<EntityLocation>,
}
impl Mutations {
pub(crate) fn start_entity(&mut self) {
self.entity_location = None;
}
pub(crate) fn entity_added(&mut self) -> bool {
self.entity_location.is_some()
}
pub(crate) fn add_entity(
&mut self,
pools: &mut ClientPools,
entity: Entity,
graph_index: Option<usize>,
entity_range: Range<usize>,
) {
let mutations = EntityMutations {
entity,
ranges: EntityRanges {
entity: entity_range,
data: pools.take_ranges(),
},
components: pools.take_components(),
};
match graph_index {
Some(index) => {
self.related[index].push(mutations);
self.entity_location = Some(EntityLocation::Related { index });
}
None => {
self.entity_location = Some(EntityLocation::Standalone);
self.standalone.push(mutations);
}
};
}
pub(crate) fn add_component(&mut self, component: Range<usize>) {
let mutations = self
.entity_location
.and_then(|location| match location {
EntityLocation::Related { index } => self.related[index].last_mut(),
EntityLocation::Standalone => self.standalone.last_mut(),
})
.expect("entity should be written before adding components");
mutations.ranges.add_data(component);
}
pub(super) fn pop(&mut self) -> Option<EntityMutations> {
self.entity_location
.take()
.and_then(|location| match location {
EntityLocation::Related { index } => self.related[index].pop(),
EntityLocation::Standalone => self.standalone.pop(),
})
}
pub(crate) fn is_empty(&self) -> bool {
self.standalone.is_empty() && self.related.is_empty()
}
pub(crate) fn send(
&mut self,
messages: &mut ServerMessages,
client: Entity,
ticks: &mut ClientTicks,
split_buffer: &mut Vec<MutationsSplit>,
pools: &mut ClientPools,
serialized: &SerializedData,
track_mutate_messages: bool,
server_tick_range: Range<usize>,
server_tick: RepliconTick,
system_tick: Tick,
timestamp: Duration,
max_size: usize,
) -> Result<usize> {
const MAX_COUNT_SIZE: usize = usize::POSTCARD_MAX_SIZE;
let mut tick_buffer = [0; RepliconTick::POSTCARD_MAX_SIZE];
let update_tick = postcard::to_slice(&ticks.update_tick, &mut tick_buffer)?;
let mut metadata_size = update_tick.len() + server_tick_range.len();
if track_mutate_messages {
metadata_size += MAX_COUNT_SIZE;
}
let mut mutate_info = MutateInfo {
server_tick,
system_tick,
timestamp,
entities: pools.take_entities(),
};
let mut mutate_index = ticks.next_mutate_index();
let mut chunks = EntityChunks::new(&mut self.related, &mut self.standalone);
let mut header_size = metadata_size + serialized_size(&mutate_index)?;
let mut body_size = 0;
let mut chunks_range = Range::<usize>::default();
for chunk in chunks.iter_mut() {
let mut mutations_size = 0;
for mutations in &mut *chunk {
mutations_size += mutations.ranges.size()?;
}
if body_size != 0
&& !can_pack(header_size + body_size, mutations_size, max_size)
&& !can_pack(header_size + mutations_size, body_size, max_size)
{
split_buffer.push(MutationsSplit {
mutate_index,
message_size: body_size + header_size,
chunks_range: chunks_range.clone(),
});
ticks.register_mutate_message(mutate_index, mutate_info);
mutate_index = ticks.next_mutate_index();
mutate_info = MutateInfo {
server_tick,
system_tick,
timestamp,
entities: pools.take_entities(),
};
chunks_range.start = chunks_range.end;
header_size = metadata_size + serialized_size(&mutate_index)?; body_size = 0;
}
mutate_info.entities.extend(
chunk
.iter_mut()
.map(|mutations| (mutations.entity, mem::take(&mut mutations.components))),
);
chunks_range.end += 1;
body_size += mutations_size;
}
if !chunks_range.is_empty() || track_mutate_messages {
split_buffer.push(MutationsSplit {
mutate_index,
message_size: body_size + header_size,
chunks_range,
});
ticks.register_mutate_message(mutate_index, mutate_info);
}
if split_buffer.len() > 1 {
trace!(
"splitting into {} messages for client `{client}`",
split_buffer.len()
);
}
for split in &*split_buffer {
let mut message_size = split.message_size;
if track_mutate_messages {
message_size -= MAX_COUNT_SIZE - serialized_size(&split_buffer.len())?;
}
let mut message = Vec::with_capacity(message_size);
message.extend_from_slice(update_tick);
message.extend_from_slice(&serialized[server_tick_range.clone()]);
if track_mutate_messages {
postcard_utils::to_extend_mut(&split_buffer.len(), &mut message)?;
}
postcard_utils::to_extend_mut(&split.mutate_index, &mut message)?;
for mutations in chunks.iter_flatten(split.chunks_range.clone()) {
message.extend_from_slice(&serialized[mutations.ranges.entity.clone()]);
postcard_utils::to_extend_mut(&mutations.ranges.data_size(), &mut message)?;
for component in &mutations.ranges.data {
message.extend_from_slice(&serialized[component.clone()]);
}
}
debug_assert_eq!(message.len(), message_size);
messages.send(client, ServerChannel::Mutations, message);
}
let len = split_buffer.len();
split_buffer.clear();
Ok(len)
}
pub(crate) fn clear(&mut self, pools: &mut ClientPools) {
for entities in self
.related
.iter_mut()
.chain(iter::once(&mut self.standalone))
{
pools.recycle_ranges(entities.drain(..).map(|m| m.ranges.data));
}
}
pub(crate) fn resize_related(&mut self, pools: &mut ClientPools, graphs_count: usize) {
match self.related.len().cmp(&graphs_count) {
Ordering::Less => self
.related
.resize_with(graphs_count, || pools.take_mutations()),
Ordering::Greater => pools.recycle_mutations(self.related.drain(graphs_count..)),
Ordering::Equal => (),
}
}
}
pub(crate) struct EntityMutations {
entity: Entity,
pub(super) ranges: EntityRanges,
pub(super) components: ComponentMask,
}
#[derive(Clone, Copy)]
enum EntityLocation {
Related { index: usize },
Standalone,
}
struct EntityChunks<'a> {
related: &'a mut [Vec<EntityMutations>],
standalone: &'a mut [EntityMutations],
}
impl<'a> EntityChunks<'a> {
fn new(related: &'a mut [Vec<EntityMutations>], standalone: &'a mut [EntityMutations]) -> Self {
Self {
related,
standalone,
}
}
fn iter_mut(&mut self) -> impl Iterator<Item = &mut [EntityMutations]> {
self.related
.iter_mut()
.map(Vec::as_mut_slice)
.chain(self.standalone.chunks_mut(1))
}
fn iter_flatten(&self, range: Range<usize>) -> impl Iterator<Item = &EntityMutations> {
let total_len = self.related.len() + self.standalone.len();
debug_assert!(range.start <= total_len);
debug_assert!(range.end <= total_len);
let split_point = self.related.len();
let related_start = range.start.min(split_point);
let related_end = range.end.min(split_point);
let standalone_start = range.start.saturating_sub(split_point);
let standalone_end = range.end.saturating_sub(split_point);
let related_range = related_start..related_end;
let standalone_range = standalone_start..standalone_end;
self.related[related_range]
.iter()
.flatten()
.chain(&self.standalone[standalone_range])
}
}
pub(crate) struct MutationsSplit {
mutate_index: MutateIndex,
message_size: usize,
chunks_range: Range<usize>,
}
fn can_pack(message_size: usize, add: usize, mtu: usize) -> bool {
let dangling = message_size % mtu;
(dangling > 0) && ((dangling + add) <= mtu)
}
#[cfg(test)]
mod tests {
use super::*;
const MAX_SIZE: usize = 1200;
#[test]
fn packing() {
assert!(can_pack(10, 5, MAX_SIZE));
assert!(can_pack(10, 1190, MAX_SIZE));
assert!(!can_pack(10, 1191, MAX_SIZE));
assert!(!can_pack(10, 3000, MAX_SIZE));
assert!(can_pack(1500, 500, MAX_SIZE));
assert!(can_pack(1500, 900, MAX_SIZE));
assert!(!can_pack(1500, 1000, MAX_SIZE));
assert!(can_pack(1199, 1, MAX_SIZE));
assert!(!can_pack(1200, 0, MAX_SIZE));
assert!(!can_pack(1200, 1, MAX_SIZE));
assert!(!can_pack(1200, 3000, MAX_SIZE));
}
#[test]
fn splitting() {
assert_eq!(send([], [], false), 0);
assert_eq!(send([], [10], false), 1);
assert_eq!(send([], [1300], false), 1);
assert_eq!(send([], [20, 20], false), 1);
assert_eq!(send([], [700, 700], false), 2);
assert_eq!(send([], [1300, 700], false), 1);
assert_eq!(send([], [1300, 1300], false), 2);
assert_eq!(send([&[10]], [], false), 1);
assert_eq!(send([&[1300]], [], false), 1);
assert_eq!(send([&[20, 20]], [], false), 1);
assert_eq!(send([&[700, 700]], [], false), 1);
assert_eq!(send([&[1300, 1300]], [], false), 1);
assert_eq!(send([&[20], &[20]], [], false), 1);
assert_eq!(send([&[700], &[700]], [], false), 2);
assert_eq!(send([&[1300], &[1300]], [], false), 2);
assert_eq!(send([&[10]], [10], false), 1);
assert_eq!(send([&[1300]], [1300], false), 2);
assert_eq!(send([&[20, 20]], [20, 20], false), 1);
assert_eq!(send([&[700, 700]], [700, 700], false), 2);
assert_eq!(send([&[1300, 1300]], [1300, 1300], false), 3);
assert_eq!(send([&[20], &[20]], [20], false), 1);
assert_eq!(send([&[700], &[700]], [700], false), 3);
assert_eq!(send([&[1300], &[1300]], [1300], false), 3);
assert_eq!(send([], [], true), 1);
assert_eq!(send([], [10], true), 1);
assert_eq!(send([&[10]], [], true), 1);
assert_eq!(send([&[10]], [10], true), 1);
assert_eq!(send([], [1194], true), 1);
}
fn send<const N: usize, const M: usize>(
related: [&[usize]; N],
standalone: [usize; M],
track_mutate_messages: bool,
) -> usize {
let mut serialized = SerializedData::default();
let mut messages = ServerMessages::default();
let mut mutations = Mutations::default();
let mut pools = ClientPools::default();
mutations.resize_related(&mut pools, related.len());
for (index, &entities) in related.iter().enumerate() {
for &mutations_size in entities {
write_entity(
&mut mutations,
&mut serialized,
&mut pools,
Some(index),
mutations_size,
);
}
}
for &mutations_size in &standalone {
write_entity(
&mut mutations,
&mut serialized,
&mut pools,
None,
mutations_size,
);
}
mutations
.send(
&mut messages,
Entity::PLACEHOLDER,
&mut Default::default(),
&mut Default::default(),
&mut Default::default(),
&serialized,
track_mutate_messages,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
MAX_SIZE,
)
.unwrap()
}
fn write_entity(
mutations: &mut Mutations,
serialized: &mut SerializedData,
pools: &mut ClientPools,
graph_index: Option<usize>,
mutations_size: usize,
) {
assert!(mutations_size > 4);
let start = serialized.len();
serialized.resize(start + mutations_size, 0);
let entity_size = start + 4;
mutations.start_entity();
mutations.add_entity(pools, Entity::PLACEHOLDER, graph_index, start..entity_size);
mutations.add_component(entity_size..serialized.len());
}
}