Skip to main content

sdec_bevy/
replicator.rs

1use std::time::Instant;
2
3use anyhow::{anyhow, Result};
4use bevy_ecs::prelude::World;
5use codec::{
6    decode_session_packet, encode_delta_from_changes, CodecLimits, SessionEncoder, SessionState,
7};
8use wire::{decode_packet, Limits as WireLimits};
9
10use crate::apply::apply_changes;
11use crate::extract::{extract_changes_with_scratch, BevyChangeSet, ExtractScratch};
12use crate::mapping::EntityMap;
13use crate::metrics::{EncodeMetrics, MetricsSink};
14use crate::schema::BevySchema;
15
16pub struct BevyReplicator {
17    schema: BevySchema,
18    limits: CodecLimits,
19    wire_limits: WireLimits,
20    entities: EntityMap,
21    session: Option<SessionState>,
22    metrics: Option<Box<dyn MetricsSink>>,
23    change_set: BevyChangeSet,
24    extract_scratch: ExtractScratch,
25}
26
27impl BevyReplicator {
28    #[must_use]
29    pub fn new(schema: BevySchema) -> Self {
30        Self {
31            schema,
32            limits: CodecLimits::default(),
33            wire_limits: WireLimits::default(),
34            entities: EntityMap::new(),
35            session: None,
36            metrics: None,
37            change_set: BevyChangeSet::default(),
38            extract_scratch: ExtractScratch::default(),
39        }
40    }
41
42    pub fn with_limits(mut self, limits: CodecLimits, wire_limits: WireLimits) -> Self {
43        self.limits = limits;
44        self.wire_limits = wire_limits;
45        self
46    }
47
48    pub fn set_metrics_sink(&mut self, sink: Box<dyn MetricsSink>) {
49        self.metrics = Some(sink);
50    }
51
52    pub fn update_session(&mut self, session: SessionState) {
53        self.session = Some(session);
54    }
55
56    pub fn encode_frame(
57        &mut self,
58        world: &mut World,
59        tick: codec::SnapshotTick,
60        baseline_tick: codec::SnapshotTick,
61        out: &mut [u8],
62    ) -> Result<usize> {
63        extract_changes_with_scratch(
64            &self.schema,
65            world,
66            &mut self.entities,
67            &mut self.extract_scratch,
68            &mut self.change_set,
69        );
70        let mut encoder = SessionEncoder::new(self.schema.schema(), &self.limits);
71        let start = Instant::now();
72        let bytes = encode_delta_from_changes(
73            &mut encoder,
74            tick,
75            baseline_tick,
76            &self.change_set.creates,
77            &self.change_set.destroys,
78            &self.change_set.updates,
79            out,
80        )?;
81        if let Some(metrics) = self.metrics.as_mut() {
82            metrics.record_encode(EncodeMetrics {
83                bytes,
84                encode_time: start.elapsed(),
85            });
86        }
87        Ok(bytes)
88    }
89
90    pub fn apply_frame(&mut self, world: &mut World, bytes: &[u8]) -> Result<()> {
91        let packet = decode_packet(bytes, &self.wire_limits)?;
92        let decoded = codec::decode_delta_packet(self.schema.schema(), &packet, &self.limits)?;
93        apply_changes(
94            &self.schema,
95            world,
96            &mut self.entities,
97            &decoded.creates,
98            &decoded.destroys,
99            &decoded.updates,
100        )?;
101        Ok(())
102    }
103
104    pub fn apply_compact_frame(&mut self, world: &mut World, bytes: &[u8]) -> Result<()> {
105        let Some(session) = self.session.as_mut() else {
106            return Err(anyhow!("session state missing; call update_session first"));
107        };
108        let packet =
109            decode_session_packet(self.schema.schema(), session, bytes, &self.wire_limits)?;
110        let decoded = codec::decode_delta_packet(self.schema.schema(), &packet, &self.limits)?;
111        apply_changes(
112            &self.schema,
113            world,
114            &mut self.entities,
115            &decoded.creates,
116            &decoded.destroys,
117            &decoded.updates,
118        )?;
119        Ok(())
120    }
121}