bevy_replicon 0.39.2

A server-authoritative replication crate for Bevy
Documentation
use core::{iter, mem, ops::Range};

use bevy::prelude::*;
use postcard::experimental::serialized_size;

use super::{entity_ranges::EntityRanges, mutations::Mutations, serialized_data::SerializedData};
use crate::{
    postcard_utils,
    prelude::*,
    server::ClientPools,
    shared::{
        backend::channels::ServerChannel,
        replication::{
            registry::{ComponentIndex, component_mask::ComponentMask},
            update_message_flags::UpdateMessageFlags,
        },
    },
};

/// Entity updates for the current tick.
///
/// The data is serialized manually and stored in the form of ranges
/// from [`SerializedData`].
///
/// Can be packed into a message using [`Self::send`].
#[derive(Component, Default)]
pub(crate) struct Updates {
    /// Entity mappings for newly visible server entities and their hashes calculated from the [`Signature`] component.
    ///
    /// Since clients may see different entities, it's serialized as multiple chunks of mappings.
    ///
    /// Mappings should be processed first, so all referenced entities after it will behave correctly.
    mappings: Vec<Range<usize>>,

    /// Number of entity mappings.
    ///
    /// May not be equal to the length of [`Self::mappings`] since adjacent ranges are merged together.
    mappings_len: usize,

    /// Despawns that happened in this tick.
    ///
    /// Since clients may see different entities, it's serialized as multiple chunks of entities.
    /// I.e. serialized server despawns may have holes for clients due to visibility differences.
    despawns: Vec<Range<usize>>,

    /// Number of depspawned entities.
    ///
    /// May not be equal to the length of [`Self::despawns`] since adjacent ranges are merged together.
    despawns_len: usize,

    /// Component removals that happened in this tick.
    ///
    /// Serialized as a list of pairs of entity chunk and a multiple chunks with
    /// [`FnsId`](crate::shared::replication::registry::FnsId).
    removals: Vec<EntityRanges>,

    /// Indicates that an entity has been written since the
    /// last call of [`Self::start_entity_removals`].
    removals_entity_added: bool,

    /// Component insertions or mutations that happened in this tick.
    ///
    /// Serialized as a list of pairs of entity chunk and multiple chunks with changed components.
    /// Components are stored in multiple chunks because newly connected clients may need to serialize all components,
    /// while previously connected clients only need the components spawned during this tick.
    ///
    /// Usually mutations are stored in [`MutateMessage`], but if an entity has any insertions or removal,
    /// or the entity just became visible for a client, we serialize it as part of the update message to keep entity updates atomic.
    changes: Vec<EntityRanges>,

    /// Components written in [`Self::changes`].
    changed_components: ComponentMask,

    /// Indicates that an entity has been written since the
    /// last call of [`Self::start_entity_changes`].
    changed_entity_added: bool,
}

impl Updates {
    pub(crate) fn add_mapping(&mut self, mappings: Range<usize>) {
        self.mappings_len += 1;
        if let Some(last) = self.mappings.last_mut() {
            // Append to previous range if possible.
            if last.end == mappings.start {
                last.end = mappings.end;
                return;
            }
        }
        self.mappings.push(mappings);
    }

    pub(crate) fn add_despawn(&mut self, entity: Range<usize>) {
        self.despawns_len += 1;
        if let Some(last) = self.despawns.last_mut() {
            // Append to previous range if possible.
            if last.end == entity.start {
                last.end = entity.end;
                return;
            }
        }
        self.despawns.push(entity);
    }

    /// Updates internal state to start writing removed components for an entity.
    ///
    /// Entities and their removals are written lazily during the iteration.
    /// See [`Self::add_removals_entity`] and [`Self::add_removal`].
    pub(crate) fn start_entity_removals(&mut self) {
        self.removals_entity_added = false;
    }

    /// Returns `true` if [`Self::add_removals_entity`] was called since the last
    /// call of [`Self::start_entity_removals`].
    pub(crate) fn removals_entity_added(&mut self) -> bool {
        self.removals_entity_added
    }

    /// Adds an entity chunk for removals.
    pub(crate) fn add_removals_entity(&mut self, pools: &mut ClientPools, entity: Range<usize>) {
        self.removals.push(EntityRanges {
            entity,
            data: pools.take_ranges(),
        });
        self.removals_entity_added = true;
    }

    /// Adds a chunk with removal to the last added entity from [`Self::add_removals_entity`].
    pub(crate) fn add_removal(&mut self, fns_id: Range<usize>) {
        debug_assert!(self.removals_entity_added);
        let removals = self
            .removals
            .last_mut()
            .expect("entity should be written before adding removals");

        removals.add_data(fns_id);
    }

    /// Updates internal state to start writing changed components for an entity.
    ///
    /// Entities and their data are written lazily during the iteration.
    /// See [`Self::add_changed_entity`] and [`Self::add_inserted_component`].
    pub(crate) fn start_entity_changes(&mut self) {
        debug_assert!(
            self.changed_components.is_empty(),
            "changed components should be taken before next entity is written"
        );
        self.changed_entity_added = false;
    }

    /// Returns `true` if [`Self::add_changed_entity`] were called since the last
    /// call of [`Self::start_entity_changes`].
    pub(crate) fn changed_entity_added(&mut self) -> bool {
        self.changed_entity_added
    }

    /// Adds an entity chunk for insertions and mutations.
    pub(crate) fn add_changed_entity(&mut self, pools: &mut ClientPools, entity: Range<usize>) {
        self.changes.push(EntityRanges {
            entity,
            data: pools.take_ranges(),
        });
        self.changed_entity_added = true;
    }

    /// Adds a component chunk to the last added entity from [`Self::add_changed_entity`].
    pub(crate) fn add_inserted_component(
        &mut self,
        component: Range<usize>,
        index: ComponentIndex,
    ) {
        debug_assert!(self.changed_entity_added);
        let changes = self
            .changes
            .last_mut()
            .expect("entity should be written before adding insertions");

        changes.add_data(component);
        self.changed_components.insert(index);
    }

    /// Takes last mutated entity with its component chunks from the mutate message.
    pub(crate) fn take_added_entity(&mut self, pools: &mut ClientPools, mutations: &mut Mutations) {
        debug_assert!(mutations.entity_added());
        let entity_mutations = mutations.pop().expect("entity should be written");

        if !self.changed_entity_added {
            self.changes.push(entity_mutations.ranges);
        } else {
            let changes = self.changes.last_mut().expect("entity should be written");
            debug_assert_eq!(entity_mutations.ranges.entity, changes.entity);
            changes.extend(&entity_mutations.ranges);
            pools.recycle_ranges(iter::once(entity_mutations.ranges.data));

            self.changed_components |= &entity_mutations.components;
            pools.recycle_components(entity_mutations.components);
        }
    }

    /// Takes all changed components for the last changed entity that was written.
    pub(crate) fn take_changed_components(&mut self) -> ComponentMask {
        mem::take(&mut self.changed_components)
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.changes.is_empty()
            && self.despawns.is_empty()
            && self.removals.is_empty()
            && self.mappings.is_empty()
    }

    /// Packs updates into a message.
    ///
    /// Contains tick, mappings, insertions, removals, and despawns that
    /// happened in this tick.
    ///
    /// Sent over [`ServerChannel::Updates`] channel.
    ///
    /// Some data is optional, and their presence is encoded in the [`UpdateMessageFlags`] bitset.
    ///
    /// To know how much data array takes, we serialize it's length. We use `usize`,
    /// but we use variable integer encoding, so they are correctly deserialized even
    /// on a client with a different pointer size. However, if the server sends a value
    /// larger than what a client can fit into `usize` (which is very unlikely), the client will panic.
    /// This is expected, as the client can't have an array of such a size anyway.
    ///
    /// Additionally, we don't serialize the size for the last array and
    /// on deserialization just consume all remaining bytes.
    pub(crate) fn send(
        &self,
        messages: &mut ServerMessages,
        client: Entity,
        serialized: &SerializedData,
        server_tick_range: Range<usize>,
    ) -> Result<()> {
        let flags = self.flags();
        let last_flag = flags.last();

        // Precalculate size first to avoid extra allocations.
        let mut message_size = size_of::<UpdateMessageFlags>() + server_tick_range.len();
        for (_, flag) in flags.iter_names() {
            match flag {
                UpdateMessageFlags::MAPPINGS => {
                    if flag != last_flag {
                        message_size += serialized_size(&self.mappings_len)?;
                    }
                    message_size += self.mappings.iter().map(Range::len).sum::<usize>();
                }
                UpdateMessageFlags::DESPAWNS => {
                    if flag != last_flag {
                        message_size += serialized_size(&self.despawns_len)?;
                    }
                    message_size += self.despawns.iter().map(Range::len).sum::<usize>();
                }
                UpdateMessageFlags::REMOVALS => {
                    if flag != last_flag {
                        message_size += serialized_size(&self.removals.len())?;
                    }
                    message_size += self
                        .removals
                        .iter()
                        .map(EntityRanges::size)
                        .sum::<Result<usize>>()?;
                }
                UpdateMessageFlags::CHANGES => {
                    debug_assert_eq!(flag, last_flag);
                    message_size += self
                        .changes
                        .iter()
                        .map(EntityRanges::size)
                        .sum::<Result<usize>>()?;
                }
                _ => unreachable!("iteration should yield only named flags"),
            }
        }

        let mut message = Vec::with_capacity(message_size);
        postcard_utils::to_extend_mut(&flags, &mut message)?;
        message.extend_from_slice(&serialized[server_tick_range]);
        for (_, flag) in flags.iter_names() {
            match flag {
                UpdateMessageFlags::MAPPINGS => {
                    if flag != last_flag {
                        postcard_utils::to_extend_mut(&self.mappings_len, &mut message)?;
                    }
                    for range in &self.mappings {
                        message.extend_from_slice(&serialized[range.clone()]);
                    }
                }
                UpdateMessageFlags::DESPAWNS => {
                    if flag != last_flag {
                        postcard_utils::to_extend_mut(&self.despawns_len, &mut message)?;
                    }
                    for range in &self.despawns {
                        message.extend_from_slice(&serialized[range.clone()]);
                    }
                }
                UpdateMessageFlags::REMOVALS => {
                    if flag != last_flag {
                        postcard_utils::to_extend_mut(&self.removals.len(), &mut message)?;
                    }
                    for removals in &self.removals {
                        message.extend_from_slice(&serialized[removals.entity.clone()]);
                        postcard_utils::to_extend_mut(&removals.data_size(), &mut message)?;
                        for fns_id in &removals.data {
                            message.extend_from_slice(&serialized[fns_id.clone()]);
                        }
                    }
                }
                UpdateMessageFlags::CHANGES => {
                    // Changes are always last, don't write len for it.
                    for changes in &self.changes {
                        message.extend_from_slice(&serialized[changes.entity.clone()]);
                        postcard_utils::to_extend_mut(&changes.data_size(), &mut message)?;
                        for component in &changes.data {
                            message.extend_from_slice(&serialized[component.clone()]);
                        }
                    }
                }
                _ => unreachable!("iteration should yield only named flags"),
            }
        }

        debug_assert_eq!(message.len(), message_size);

        messages.send(client, ServerChannel::Updates, message);

        Ok(())
    }

    fn flags(&self) -> UpdateMessageFlags {
        let mut flags = UpdateMessageFlags::default();

        if !self.mappings.is_empty() {
            flags |= UpdateMessageFlags::MAPPINGS;
        }
        if !self.despawns.is_empty() {
            flags |= UpdateMessageFlags::DESPAWNS;
        }
        if !self.removals.is_empty() {
            flags |= UpdateMessageFlags::REMOVALS;
        }
        if !self.changes.is_empty() {
            flags |= UpdateMessageFlags::CHANGES;
        }

        flags
    }

    /// Clears all chunks.
    ///
    /// Keeps allocated memory for reuse.
    pub(crate) fn clear(&mut self, pools: &mut ClientPools) {
        self.mappings = Default::default();
        self.mappings_len = 0;
        self.despawns.clear();
        self.despawns_len = 0;

        pools.recycle_ranges(self.changes.drain(..).map(|c| c.data));
        pools.recycle_ranges(self.removals.drain(..).map(|c| c.data));
    }
}