bevy_replicon 0.39.2

A server-authoritative replication crate for Bevy
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
use core::{cmp::Ordering, iter, mem, ops::Range, time::Duration};

use bevy::{ecs::change_detection::Tick, prelude::*};
use log::trace;
use postcard::experimental::{max_size::MaxSize, serialized_size};

use super::{entity_ranges::EntityRanges, serialized_data::SerializedData};
use crate::{
    postcard_utils,
    prelude::*,
    server::ClientPools,
    shared::{
        backend::channels::ServerChannel,
        replication::{
            client_ticks::{ClientTicks, MutateInfo},
            mutate_index::MutateIndex,
            registry::component_mask::ComponentMask,
        },
    },
};

/// Component mutations for the current tick.
///
/// The data is serialized manually and stored in the form of ranges
/// from [`SerializedData`].
///
/// Can be packed into messages using [`Self::send`].
#[derive(Component, Default)]
pub(crate) struct Mutations {
    /// Entities that are related to each other and should be replicated in sync.
    ///
    /// Like [`Self::standalone`], but grouped into arrays based on their relation graph indices.
    /// These entities are guaranteed to be included in a single message.
    related: Vec<Vec<EntityMutations>>,

    /// Component mutations that happened in this tick.
    ///
    /// These mutation are not related to any others and can be replicated independently.
    standalone: Vec<EntityMutations>,

    /// Location of the last written entity since the last call of [`Self::start_entity_mutations`].
    entity_location: Option<EntityLocation>,
}

impl Mutations {
    /// Updates internal state to start writing mutated components for an entity.
    ///
    /// Entities and their data written lazily during the iteration.
    /// See [`Self::add_entity`] and [`Self::add_component`].
    pub(crate) fn start_entity(&mut self) {
        self.entity_location = None;
    }

    /// Returns `true` if [`Self::add_entity`] were called since the last
    /// call of [`Self::start_entity`].
    pub(crate) fn entity_added(&mut self) -> bool {
        self.entity_location.is_some()
    }

    /// Adds an entity chunk.
    pub(crate) fn add_entity(
        &mut self,
        pools: &mut ClientPools,
        entity: Entity,
        graph_index: Option<usize>,
        entity_range: Range<usize>,
    ) {
        let mutations = EntityMutations {
            entity,
            ranges: EntityRanges {
                entity: entity_range,
                data: pools.take_ranges(),
            },
            components: pools.take_components(),
        };

        match graph_index {
            Some(index) => {
                self.related[index].push(mutations);
                self.entity_location = Some(EntityLocation::Related { index });
            }
            None => {
                self.entity_location = Some(EntityLocation::Standalone);
                self.standalone.push(mutations);
            }
        };
    }

    /// Adds a component chunk to the last added entity from [`Self::add_entity`].
    pub(crate) fn add_component(&mut self, component: Range<usize>) {
        let mutations = self
            .entity_location
            .and_then(|location| match location {
                EntityLocation::Related { index } => self.related[index].last_mut(),
                EntityLocation::Standalone => self.standalone.last_mut(),
            })
            .expect("entity should be written before adding components");

        mutations.ranges.add_data(component);
    }

    /// Removes last added entity from [`Self::add_entity`] and returns it.
    pub(super) fn pop(&mut self) -> Option<EntityMutations> {
        self.entity_location
            .take()
            .and_then(|location| match location {
                EntityLocation::Related { index } => self.related[index].pop(),
                EntityLocation::Standalone => self.standalone.pop(),
            })
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.standalone.is_empty() && self.related.is_empty()
    }

    /// Packs mutations into messages.
    ///
    /// Contains update tick, current tick, mutate index and component mutations since
    /// the last acknowledged tick for each entity.
    ///
    /// Cannot be applied on the client until the update message matching this message's update tick
    /// has been applied to the client world.
    /// The message will be manually split into packets up to max size, and each packet will be applied
    /// independently on the client.
    /// Message splits only happen per-entity to avoid weird behavior from partial entity mutations.
    ///
    /// Sent over the [`ServerChannel::Mutations`] channel. If the message gets lost, we try to resend it manually,
    /// using the last up-to-date mutations to avoid re-sending old values.
    pub(crate) fn send(
        &mut self,
        messages: &mut ServerMessages,
        client: Entity,
        ticks: &mut ClientTicks,
        split_buffer: &mut Vec<MutationsSplit>,
        pools: &mut ClientPools,
        serialized: &SerializedData,
        track_mutate_messages: bool,
        server_tick_range: Range<usize>,
        server_tick: RepliconTick,
        system_tick: Tick,
        timestamp: Duration,
        max_size: usize,
    ) -> Result<usize> {
        const MAX_COUNT_SIZE: usize = usize::POSTCARD_MAX_SIZE;
        let mut tick_buffer = [0; RepliconTick::POSTCARD_MAX_SIZE];
        let update_tick = postcard::to_slice(&ticks.update_tick, &mut tick_buffer)?;
        let mut metadata_size = update_tick.len() + server_tick_range.len();
        if track_mutate_messages {
            metadata_size += MAX_COUNT_SIZE;
        }

        let mut mutate_info = MutateInfo {
            server_tick,
            system_tick,
            timestamp,
            entities: pools.take_entities(),
        };
        let mut mutate_index = ticks.next_mutate_index();
        let mut chunks = EntityChunks::new(&mut self.related, &mut self.standalone);
        let mut header_size = metadata_size + serialized_size(&mutate_index)?;
        let mut body_size = 0;
        let mut chunks_range = Range::<usize>::default();
        for chunk in chunks.iter_mut() {
            let mut mutations_size = 0;
            for mutations in &mut *chunk {
                mutations_size += mutations.ranges.size()?;
            }

            // Try to pack back first, then try to pack forward.
            if body_size != 0
                && !can_pack(header_size + body_size, mutations_size, max_size)
                && !can_pack(header_size + mutations_size, body_size, max_size)
            {
                split_buffer.push(MutationsSplit {
                    mutate_index,
                    message_size: body_size + header_size,
                    chunks_range: chunks_range.clone(),
                });

                ticks.register_mutate_message(mutate_index, mutate_info);
                mutate_index = ticks.next_mutate_index();
                mutate_info = MutateInfo {
                    server_tick,
                    system_tick,
                    timestamp,
                    entities: pools.take_entities(),
                };
                chunks_range.start = chunks_range.end;
                header_size = metadata_size + serialized_size(&mutate_index)?; // Recalculate since the mutate index changed.
                body_size = 0;
            }

            mutate_info.entities.extend(
                chunk
                    .iter_mut()
                    .map(|mutations| (mutations.entity, mem::take(&mut mutations.components))),
            );
            chunks_range.end += 1;
            body_size += mutations_size;
        }
        if !chunks_range.is_empty() || track_mutate_messages {
            // When the loop ends, pack all leftovers into a message.
            // Or create an empty message if tracking mutate messages is enabled.
            split_buffer.push(MutationsSplit {
                mutate_index,
                message_size: body_size + header_size,
                chunks_range,
            });
            ticks.register_mutate_message(mutate_index, mutate_info);
        }

        if split_buffer.len() > 1 {
            trace!(
                "splitting into {} messages for client `{client}`",
                split_buffer.len()
            );
        }

        for split in &*split_buffer {
            let mut message_size = split.message_size;
            if track_mutate_messages {
                // Update message counter size based on actual value.
                message_size -= MAX_COUNT_SIZE - serialized_size(&split_buffer.len())?;
            }
            let mut message = Vec::with_capacity(message_size);

            message.extend_from_slice(update_tick);
            message.extend_from_slice(&serialized[server_tick_range.clone()]);
            if track_mutate_messages {
                postcard_utils::to_extend_mut(&split_buffer.len(), &mut message)?;
            }
            postcard_utils::to_extend_mut(&split.mutate_index, &mut message)?;
            for mutations in chunks.iter_flatten(split.chunks_range.clone()) {
                message.extend_from_slice(&serialized[mutations.ranges.entity.clone()]);
                postcard_utils::to_extend_mut(&mutations.ranges.data_size(), &mut message)?;
                for component in &mutations.ranges.data {
                    message.extend_from_slice(&serialized[component.clone()]);
                }
            }

            debug_assert_eq!(message.len(), message_size);

            messages.send(client, ServerChannel::Mutations, message);
        }

        let len = split_buffer.len();
        split_buffer.clear();
        Ok(len)
    }

    /// Clears all entity mutations.
    ///
    /// Keeps allocated memory for reuse.
    ///
    /// The outer array of [`Self::related`] is not cleared.
    /// It should be resized via [`Self::resize_related`] before
    /// collecting new changes.
    pub(crate) fn clear(&mut self, pools: &mut ClientPools) {
        for entities in self
            .related
            .iter_mut()
            .chain(iter::once(&mut self.standalone))
        {
            pools.recycle_ranges(entities.drain(..).map(|m| m.ranges.data));
            // We don't take component masks because they are moved to `MutateInfo` during sending.
        }
    }

    /// Updates size of [`Self::related`] to split related entities by graph index.
    ///
    /// Keeps allocated memory for reuse.
    pub(crate) fn resize_related(&mut self, pools: &mut ClientPools, graphs_count: usize) {
        match self.related.len().cmp(&graphs_count) {
            Ordering::Less => self
                .related
                .resize_with(graphs_count, || pools.take_mutations()),
            Ordering::Greater => pools.recycle_mutations(self.related.drain(graphs_count..)),
            Ordering::Equal => (),
        }
    }
}

/// Mutations data for [`Mutations::related`] and [`Mutations::standalone`].
pub(crate) struct EntityMutations {
    /// Associated entity.
    ///
    /// Used to associate entities with the mutate message index that the client
    /// needs to acknowledge to consider entity mutations received.
    entity: Entity,

    /// Component mutations that happened in this tick.
    ///
    /// Serialized as a list of pairs of entity chunk and multiple chunks with mutated components.
    /// Components are stored in multiple chunks because some clients may acknowledge mutations,
    /// while others may not.
    pub(super) ranges: EntityRanges,

    /// Components written in [`Self::ranges`].
    ///
    /// Like [`Self::entity`], used for later component acknowledgement.
    pub(super) components: ComponentMask,
}

#[derive(Clone, Copy)]
enum EntityLocation {
    Related { index: usize },
    Standalone,
}

/// Treats related and standalone entity mutations as a single continuous buffer,
/// with related entities first, followed by standalone ones.
struct EntityChunks<'a> {
    related: &'a mut [Vec<EntityMutations>],
    standalone: &'a mut [EntityMutations],
}

impl<'a> EntityChunks<'a> {
    fn new(related: &'a mut [Vec<EntityMutations>], standalone: &'a mut [EntityMutations]) -> Self {
        Self {
            related,
            standalone,
        }
    }

    /// Returns an iterator over slices of related entities.
    ///
    /// Standalone entities are represented as single-element slices.
    fn iter_mut(&mut self) -> impl Iterator<Item = &mut [EntityMutations]> {
        self.related
            .iter_mut()
            .map(Vec::as_mut_slice)
            .chain(self.standalone.chunks_mut(1))
    }

    /// Returns an iterator over flattened slices of entity mutations within the specified range.
    ///
    /// The range indexes chunk numbers (not individual elements).
    fn iter_flatten(&self, range: Range<usize>) -> impl Iterator<Item = &EntityMutations> {
        let total_len = self.related.len() + self.standalone.len();
        debug_assert!(range.start <= total_len);
        debug_assert!(range.end <= total_len);

        let split_point = self.related.len();

        let related_start = range.start.min(split_point);
        let related_end = range.end.min(split_point);
        let standalone_start = range.start.saturating_sub(split_point);
        let standalone_end = range.end.saturating_sub(split_point);

        let related_range = related_start..related_end;
        let standalone_range = standalone_start..standalone_end;

        self.related[related_range]
            .iter()
            .flatten()
            .chain(&self.standalone[standalone_range])
    }
}

/// Information about mutations that are split into a message.
///
/// We split mutations into messages first in order to know their count in advance.
pub(crate) struct MutationsSplit {
    mutate_index: MutateIndex,
    message_size: usize,
    /// Indices in [`EntityChunks`].
    chunks_range: Range<usize>,
}

/// Returns `true` if the additional data fits within the remaining space
/// of the current packet tail.
///
/// When the message already exceeds the MTU, more data can be packed
/// as long as it fits within the last partial packet without causing
/// an additional packet to be created.
fn can_pack(message_size: usize, add: usize, mtu: usize) -> bool {
    let dangling = message_size % mtu;
    (dangling > 0) && ((dangling + add) <= mtu)
}

#[cfg(test)]
mod tests {
    use super::*;

    const MAX_SIZE: usize = 1200;

    #[test]
    fn packing() {
        assert!(can_pack(10, 5, MAX_SIZE));
        assert!(can_pack(10, 1190, MAX_SIZE));
        assert!(!can_pack(10, 1191, MAX_SIZE));
        assert!(!can_pack(10, 3000, MAX_SIZE));

        assert!(can_pack(1500, 500, MAX_SIZE));
        assert!(can_pack(1500, 900, MAX_SIZE));
        assert!(!can_pack(1500, 1000, MAX_SIZE));

        assert!(can_pack(1199, 1, MAX_SIZE));
        assert!(!can_pack(1200, 0, MAX_SIZE));
        assert!(!can_pack(1200, 1, MAX_SIZE));
        assert!(!can_pack(1200, 3000, MAX_SIZE));
    }

    #[test]
    fn splitting() {
        assert_eq!(send([], [], false), 0);
        assert_eq!(send([], [10], false), 1);
        assert_eq!(send([], [1300], false), 1);
        assert_eq!(send([], [20, 20], false), 1);
        assert_eq!(send([], [700, 700], false), 2);
        assert_eq!(send([], [1300, 700], false), 1);
        assert_eq!(send([], [1300, 1300], false), 2);

        assert_eq!(send([&[10]], [], false), 1);
        assert_eq!(send([&[1300]], [], false), 1);
        assert_eq!(send([&[20, 20]], [], false), 1);
        assert_eq!(send([&[700, 700]], [], false), 1);
        assert_eq!(send([&[1300, 1300]], [], false), 1);
        assert_eq!(send([&[20], &[20]], [], false), 1);
        assert_eq!(send([&[700], &[700]], [], false), 2);
        assert_eq!(send([&[1300], &[1300]], [], false), 2);

        assert_eq!(send([&[10]], [10], false), 1);
        assert_eq!(send([&[1300]], [1300], false), 2);
        assert_eq!(send([&[20, 20]], [20, 20], false), 1);
        assert_eq!(send([&[700, 700]], [700, 700], false), 2);
        assert_eq!(send([&[1300, 1300]], [1300, 1300], false), 3);
        assert_eq!(send([&[20], &[20]], [20], false), 1);
        assert_eq!(send([&[700], &[700]], [700], false), 3);
        assert_eq!(send([&[1300], &[1300]], [1300], false), 3);

        assert_eq!(send([], [], true), 1);
        assert_eq!(send([], [10], true), 1);
        assert_eq!(send([&[10]], [], true), 1);
        assert_eq!(send([&[10]], [10], true), 1);
        assert_eq!(send([], [1194], true), 1);
    }

    /// Mocks message sending with specified data sizes.
    ///
    /// `related` and `standalone` specify sizes for entities and their mutations.
    /// See also [`write_entity`].
    fn send<const N: usize, const M: usize>(
        related: [&[usize]; N],
        standalone: [usize; M],
        track_mutate_messages: bool,
    ) -> usize {
        let mut serialized = SerializedData::default();
        let mut messages = ServerMessages::default();
        let mut mutations = Mutations::default();
        let mut pools = ClientPools::default();

        mutations.resize_related(&mut pools, related.len());

        for (index, &entities) in related.iter().enumerate() {
            for &mutations_size in entities {
                write_entity(
                    &mut mutations,
                    &mut serialized,
                    &mut pools,
                    Some(index),
                    mutations_size,
                );
            }
        }

        for &mutations_size in &standalone {
            write_entity(
                &mut mutations,
                &mut serialized,
                &mut pools,
                None,
                mutations_size,
            );
        }

        mutations
            .send(
                &mut messages,
                Entity::PLACEHOLDER,
                &mut Default::default(),
                &mut Default::default(),
                &mut Default::default(),
                &serialized,
                track_mutate_messages,
                Default::default(),
                Default::default(),
                Default::default(),
                Default::default(),
                MAX_SIZE,
            )
            .unwrap()
    }

    /// Mocks writing an entity with a single mutated component of specified size.
    ///
    /// 4 bytes will be used for the entity, with the remaining space used by the component.
    /// All written data will be zeros.
    fn write_entity(
        mutations: &mut Mutations,
        serialized: &mut SerializedData,
        pools: &mut ClientPools,
        graph_index: Option<usize>,
        mutations_size: usize,
    ) {
        assert!(mutations_size > 4);
        let start = serialized.len();
        serialized.resize(start + mutations_size, 0);

        let entity_size = start + 4;
        mutations.start_entity();
        mutations.add_entity(pools, Entity::PLACEHOLDER, graph_index, start..entity_size);
        mutations.add_component(entity_size..serialized.len());
    }
}