1use std::time::Instant;
2
3use anyhow::{anyhow, Result};
4use bevy_ecs::prelude::World;
5use codec::{
6 decode_session_packet, encode_delta_from_changes, CodecLimits, SessionEncoder, SessionState,
7};
8use wire::{decode_packet, Limits as WireLimits};
9
10use crate::apply::apply_changes;
11use crate::extract::{extract_changes_with_scratch, BevyChangeSet, ExtractScratch};
12use crate::mapping::EntityMap;
13use crate::metrics::{EncodeMetrics, MetricsSink};
14use crate::schema::BevySchema;
15
16pub struct BevyReplicator {
17 schema: BevySchema,
18 limits: CodecLimits,
19 wire_limits: WireLimits,
20 entities: EntityMap,
21 session: Option<SessionState>,
22 metrics: Option<Box<dyn MetricsSink>>,
23 change_set: BevyChangeSet,
24 extract_scratch: ExtractScratch,
25}
26
27impl BevyReplicator {
28 #[must_use]
29 pub fn new(schema: BevySchema) -> Self {
30 Self {
31 schema,
32 limits: CodecLimits::default(),
33 wire_limits: WireLimits::default(),
34 entities: EntityMap::new(),
35 session: None,
36 metrics: None,
37 change_set: BevyChangeSet::default(),
38 extract_scratch: ExtractScratch::default(),
39 }
40 }
41
42 pub fn with_limits(mut self, limits: CodecLimits, wire_limits: WireLimits) -> Self {
43 self.limits = limits;
44 self.wire_limits = wire_limits;
45 self
46 }
47
48 pub fn set_metrics_sink(&mut self, sink: Box<dyn MetricsSink>) {
49 self.metrics = Some(sink);
50 }
51
52 pub fn update_session(&mut self, session: SessionState) {
53 self.session = Some(session);
54 }
55
56 pub fn encode_frame(
57 &mut self,
58 world: &mut World,
59 tick: codec::SnapshotTick,
60 baseline_tick: codec::SnapshotTick,
61 out: &mut [u8],
62 ) -> Result<usize> {
63 extract_changes_with_scratch(
64 &self.schema,
65 world,
66 &mut self.entities,
67 &mut self.extract_scratch,
68 &mut self.change_set,
69 );
70 let mut encoder = SessionEncoder::new(self.schema.schema(), &self.limits);
71 let start = Instant::now();
72 let bytes = encode_delta_from_changes(
73 &mut encoder,
74 tick,
75 baseline_tick,
76 &self.change_set.creates,
77 &self.change_set.destroys,
78 &self.change_set.updates,
79 out,
80 )?;
81 if let Some(metrics) = self.metrics.as_mut() {
82 metrics.record_encode(EncodeMetrics {
83 bytes,
84 encode_time: start.elapsed(),
85 });
86 }
87 Ok(bytes)
88 }
89
90 pub fn apply_frame(&mut self, world: &mut World, bytes: &[u8]) -> Result<()> {
91 let packet = decode_packet(bytes, &self.wire_limits)?;
92 let decoded = codec::decode_delta_packet(self.schema.schema(), &packet, &self.limits)?;
93 apply_changes(
94 &self.schema,
95 world,
96 &mut self.entities,
97 &decoded.creates,
98 &decoded.destroys,
99 &decoded.updates,
100 )?;
101 Ok(())
102 }
103
104 pub fn apply_compact_frame(&mut self, world: &mut World, bytes: &[u8]) -> Result<()> {
105 let Some(session) = self.session.as_mut() else {
106 return Err(anyhow!("session state missing; call update_session first"));
107 };
108 let packet =
109 decode_session_packet(self.schema.schema(), session, bytes, &self.wire_limits)?;
110 let decoded = codec::decode_delta_packet(self.schema.schema(), &packet, &self.limits)?;
111 apply_changes(
112 &self.schema,
113 world,
114 &mut self.entities,
115 &decoded.creates,
116 &decoded.destroys,
117 &decoded.updates,
118 )?;
119 Ok(())
120 }
121}