use core::{mem, ops::Range, time::Duration};
use bevy::{ecs::change_detection::Tick, prelude::*};
use log::trace;
use postcard::experimental::{max_size::MaxSize, serialized_size};
use super::{entity_ranges::EntityRanges, serialized_data::SerializedData};
use crate::{
postcard_utils,
prelude::*,
shared::{
backend::channels::ServerChannel,
replication::{
client_ticks::{ClientTicks, DiffCursors, MutateInfo, MutatedEntityInfo},
mutate_index::MutateIndex,
registry::{ComponentIndex, component_mask::ComponentMask},
},
},
};
#[derive(Component, Default)]
pub(crate) struct Mutations {
related: Vec<Vec<EntityMutations>>,
standalone: Vec<EntityMutations>,
entity_location: Option<EntityLocation>,
}
impl Mutations {
pub(crate) fn start_entity(&mut self) {
self.entity_location = None;
}
pub(crate) fn entity_added(&mut self) -> bool {
self.entity_location.is_some()
}
pub(crate) fn add_entity(
&mut self,
entity: Entity,
graph_index: Option<usize>,
entity_range: Range<usize>,
) {
let mutations = EntityMutations {
entity,
ranges: EntityRanges {
entity: entity_range,
data: Default::default(),
},
components: Default::default(),
diff_cursors: Default::default(),
};
match graph_index {
Some(index) => {
self.related[index].push(mutations);
self.entity_location = Some(EntityLocation::Related { index });
}
None => {
self.entity_location = Some(EntityLocation::Standalone);
self.standalone.push(mutations);
}
};
}
pub(crate) fn add_component(&mut self, component: Range<usize>) {
let mutations = self
.entity_location
.and_then(|location| match location {
EntityLocation::Related { index } => self.related[index].last_mut(),
EntityLocation::Standalone => self.standalone.last_mut(),
})
.expect("entity should be written before adding components");
mutations.ranges.add_data(component);
}
pub(crate) fn add_diff_cursor(&mut self, component: ComponentIndex, cursor: DiffIndex) {
let mutations = self
.entity_location
.and_then(|location| match location {
EntityLocation::Related { index } => self.related[index].last_mut(),
EntityLocation::Standalone => self.standalone.last_mut(),
})
.expect("entity should be written before adding diff cursors");
mutations.diff_cursors.push((component, cursor));
}
pub(super) fn pop(&mut self) -> Option<EntityMutations> {
self.entity_location
.take()
.and_then(|location| match location {
EntityLocation::Related { index } => self.related[index].pop(),
EntityLocation::Standalone => self.standalone.pop(),
})
}
pub(crate) fn is_empty(&self) -> bool {
self.standalone.is_empty() && self.related.is_empty()
}
pub(crate) fn send(
&mut self,
messages: &mut ServerMessages,
client: Entity,
ticks: &mut ClientTicks,
split_buffer: &mut Vec<MutationsSplit>,
serialized: &SerializedData,
track_mutate_messages: bool,
server_tick_range: Range<usize>,
server_tick: RepliconTick,
system_tick: Tick,
timestamp: Duration,
max_size: usize,
) -> Result<usize> {
const MAX_COUNT_SIZE: usize = usize::POSTCARD_MAX_SIZE;
let mut tick_buffer = [0; RepliconTick::POSTCARD_MAX_SIZE];
let update_tick = postcard::to_slice(&ticks.update_tick, &mut tick_buffer)?;
let mut metadata_size = update_tick.len() + server_tick_range.len();
if track_mutate_messages {
metadata_size += MAX_COUNT_SIZE;
}
let mut mutate_info = MutateInfo {
server_tick,
system_tick,
timestamp,
entities: Default::default(),
};
let mut mutate_index = ticks.next_mutate_index();
let mut chunks = EntityChunks::new(&mut self.related, &mut self.standalone);
let mut header_size = metadata_size + serialized_size(&mutate_index)?;
let mut body_size = 0;
let mut chunks_range = Range::<usize>::default();
for chunk in chunks.iter_mut() {
let mut mutations_size = 0;
for mutations in &mut *chunk {
mutations_size += mutations.ranges.size()?;
}
if body_size != 0
&& !can_pack(header_size + body_size, mutations_size, max_size)
&& !can_pack(header_size + mutations_size, body_size, max_size)
{
split_buffer.push(MutationsSplit {
mutate_index,
message_size: body_size + header_size,
chunks_range: chunks_range.clone(),
});
ticks.register_mutate_message(mutate_index, mutate_info);
mutate_index = ticks.next_mutate_index();
mutate_info = MutateInfo {
server_tick,
system_tick,
timestamp,
entities: Default::default(),
};
chunks_range.start = chunks_range.end;
header_size = metadata_size + serialized_size(&mutate_index)?; body_size = 0;
}
mutate_info
.entities
.extend(chunk.iter_mut().map(|mutations| MutatedEntityInfo {
entity: mutations.entity,
components: mem::take(&mut mutations.components),
diff_cursors: mem::take(&mut mutations.diff_cursors),
}));
chunks_range.end += 1;
body_size += mutations_size;
}
if !chunks_range.is_empty() || track_mutate_messages {
split_buffer.push(MutationsSplit {
mutate_index,
message_size: body_size + header_size,
chunks_range,
});
ticks.register_mutate_message(mutate_index, mutate_info);
}
if split_buffer.len() > 1 {
trace!(
"splitting into {} messages for client `{client}`",
split_buffer.len()
);
}
for split in &*split_buffer {
let mut message_size = split.message_size;
if track_mutate_messages {
message_size -= MAX_COUNT_SIZE - serialized_size(&split_buffer.len())?;
}
let mut message = Vec::with_capacity(message_size);
message.extend_from_slice(update_tick);
message.extend_from_slice(&serialized[server_tick_range.clone()]);
if track_mutate_messages {
postcard_utils::to_extend_mut(&split_buffer.len(), &mut message)?;
}
postcard_utils::to_extend_mut(&split.mutate_index, &mut message)?;
for mutations in chunks.iter_flatten(split.chunks_range.clone()) {
message.extend_from_slice(&serialized[mutations.ranges.entity.clone()]);
postcard_utils::to_extend_mut(&mutations.ranges.data_size(), &mut message)?;
for component in &mutations.ranges.data {
message.extend_from_slice(&serialized[component.clone()]);
}
}
debug_assert_eq!(message.len(), message_size);
messages.send(client, ServerChannel::Mutations, message);
}
let len = split_buffer.len();
split_buffer.clear();
Ok(len)
}
pub(crate) fn reset(&mut self, graphs_count: usize) {
let old_len = self.related.len();
self.related.resize_with(graphs_count, Default::default);
for entities in &mut self.related[..old_len.min(graphs_count)] {
entities.clear();
}
self.standalone.clear();
}
}
pub(crate) struct EntityMutations {
entity: Entity,
pub(super) ranges: EntityRanges,
pub(super) components: ComponentMask,
pub(super) diff_cursors: DiffCursors,
}
#[derive(Clone, Copy)]
enum EntityLocation {
Related { index: usize },
Standalone,
}
struct EntityChunks<'a> {
related: &'a mut [Vec<EntityMutations>],
standalone: &'a mut [EntityMutations],
}
impl<'a> EntityChunks<'a> {
fn new(related: &'a mut [Vec<EntityMutations>], standalone: &'a mut [EntityMutations]) -> Self {
Self {
related,
standalone,
}
}
fn iter_mut(&mut self) -> impl Iterator<Item = &mut [EntityMutations]> {
self.related
.iter_mut()
.map(Vec::as_mut_slice)
.chain(self.standalone.chunks_mut(1))
}
fn iter_flatten(&self, range: Range<usize>) -> impl Iterator<Item = &EntityMutations> {
let total_len = self.related.len() + self.standalone.len();
debug_assert!(range.start <= total_len);
debug_assert!(range.end <= total_len);
let split_point = self.related.len();
let related_start = range.start.min(split_point);
let related_end = range.end.min(split_point);
let standalone_start = range.start.saturating_sub(split_point);
let standalone_end = range.end.saturating_sub(split_point);
let related_range = related_start..related_end;
let standalone_range = standalone_start..standalone_end;
self.related[related_range]
.iter()
.flatten()
.chain(&self.standalone[standalone_range])
}
}
pub(crate) struct MutationsSplit {
mutate_index: MutateIndex,
message_size: usize,
chunks_range: Range<usize>,
}
fn can_pack(message_size: usize, add: usize, mtu: usize) -> bool {
let dangling = message_size % mtu;
(dangling > 0) && ((dangling + add) <= mtu)
}
#[cfg(test)]
mod tests {
use super::*;
const MAX_SIZE: usize = 1200;
#[test]
fn packing() {
assert!(can_pack(10, 5, MAX_SIZE));
assert!(can_pack(10, 1190, MAX_SIZE));
assert!(!can_pack(10, 1191, MAX_SIZE));
assert!(!can_pack(10, 3000, MAX_SIZE));
assert!(can_pack(1500, 500, MAX_SIZE));
assert!(can_pack(1500, 900, MAX_SIZE));
assert!(!can_pack(1500, 1000, MAX_SIZE));
assert!(can_pack(1199, 1, MAX_SIZE));
assert!(!can_pack(1200, 0, MAX_SIZE));
assert!(!can_pack(1200, 1, MAX_SIZE));
assert!(!can_pack(1200, 3000, MAX_SIZE));
}
#[test]
fn splitting() {
assert_eq!(send([], [], false), 0);
assert_eq!(send([], [10], false), 1);
assert_eq!(send([], [1300], false), 1);
assert_eq!(send([], [20, 20], false), 1);
assert_eq!(send([], [700, 700], false), 2);
assert_eq!(send([], [1300, 700], false), 1);
assert_eq!(send([], [1300, 1300], false), 2);
assert_eq!(send([&[10]], [], false), 1);
assert_eq!(send([&[1300]], [], false), 1);
assert_eq!(send([&[20, 20]], [], false), 1);
assert_eq!(send([&[700, 700]], [], false), 1);
assert_eq!(send([&[1300, 1300]], [], false), 1);
assert_eq!(send([&[20], &[20]], [], false), 1);
assert_eq!(send([&[700], &[700]], [], false), 2);
assert_eq!(send([&[1300], &[1300]], [], false), 2);
assert_eq!(send([&[10]], [10], false), 1);
assert_eq!(send([&[1300]], [1300], false), 2);
assert_eq!(send([&[20, 20]], [20, 20], false), 1);
assert_eq!(send([&[700, 700]], [700, 700], false), 2);
assert_eq!(send([&[1300, 1300]], [1300, 1300], false), 3);
assert_eq!(send([&[20], &[20]], [20], false), 1);
assert_eq!(send([&[700], &[700]], [700], false), 3);
assert_eq!(send([&[1300], &[1300]], [1300], false), 3);
assert_eq!(send([], [], true), 1);
assert_eq!(send([], [10], true), 1);
assert_eq!(send([&[10]], [], true), 1);
assert_eq!(send([&[10]], [10], true), 1);
assert_eq!(send([], [1194], true), 1);
}
fn send<const N: usize, const M: usize>(
related: [&[usize]; N],
standalone: [usize; M],
track_mutate_messages: bool,
) -> usize {
let mut serialized = SerializedData::default();
let mut messages = ServerMessages::default();
let mut mutations = Mutations::default();
mutations.reset(related.len());
for (index, &entities) in related.iter().enumerate() {
for &mutations_size in entities {
write_entity(&mut mutations, &mut serialized, Some(index), mutations_size);
}
}
for &mutations_size in &standalone {
write_entity(&mut mutations, &mut serialized, None, mutations_size);
}
mutations
.send(
&mut messages,
Entity::PLACEHOLDER,
&mut Default::default(),
&mut Default::default(),
&serialized,
track_mutate_messages,
Default::default(),
Default::default(),
Default::default(),
Default::default(),
MAX_SIZE,
)
.unwrap()
}
fn write_entity(
mutations: &mut Mutations,
serialized: &mut SerializedData,
graph_index: Option<usize>,
mutations_size: usize,
) {
assert!(mutations_size > 4);
let start = serialized.len();
serialized.resize(start + mutations_size, 0);
let entity_size = start + 4;
mutations.start_entity();
mutations.add_entity(Entity::PLACEHOLDER, graph_index, start..entity_size);
mutations.add_component(entity_size..serialized.len());
}
}