tx2_link/
sync.rs

1use crate::error::{LinkError, Result};
2use crate::protocol::*;
3use crate::serialization::{WorldSnapshot, Delta};
4use crate::transport::Transport;
5use crate::compression::DeltaCompressor;
6use crate::rate_limit::{RateLimiter, RateLimitConfig};
7use crate::schema::{SchemaRegistry, SchemaVersion};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SyncMode {
12    Full,
13    Delta,
14    Manual,
15}
16
17#[derive(Debug, Clone)]
18pub struct SyncConfig {
19    pub mode: SyncMode,
20    pub sync_interval: Duration,
21    pub enable_rate_limiting: bool,
22    pub rate_limit_config: RateLimitConfig,
23    pub enable_field_compression: bool,
24    pub auto_reconnect: bool,
25    pub max_reconnect_attempts: u32,
26    pub reconnect_delay: Duration,
27}
28
29impl Default for SyncConfig {
30    fn default() -> Self {
31        Self {
32            mode: SyncMode::Delta,
33            sync_interval: Duration::from_millis(100),
34            enable_rate_limiting: true,
35            rate_limit_config: RateLimitConfig::default(),
36            enable_field_compression: true,
37            auto_reconnect: false,
38            max_reconnect_attempts: 3,
39            reconnect_delay: Duration::from_secs(1),
40        }
41    }
42}
43
44impl SyncConfig {
45    pub fn new() -> Self {
46        Self::default()
47    }
48
49    pub fn with_mode(mut self, mode: SyncMode) -> Self {
50        self.mode = mode;
51        self
52    }
53
54    pub fn with_sync_interval(mut self, interval: Duration) -> Self {
55        self.sync_interval = interval;
56        self
57    }
58
59    pub fn with_rate_limiting(mut self, enabled: bool) -> Self {
60        self.enable_rate_limiting = enabled;
61        self
62    }
63
64    pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
65        self.rate_limit_config = config;
66        self
67    }
68
69    pub fn with_field_compression(mut self, enabled: bool) -> Self {
70        self.enable_field_compression = enabled;
71        self
72    }
73
74    pub fn with_auto_reconnect(mut self, enabled: bool, max_attempts: u32) -> Self {
75        self.auto_reconnect = enabled;
76        self.max_reconnect_attempts = max_attempts;
77        self
78    }
79}
80
81pub struct SyncManager<T: Transport> {
82    transport: T,
83    config: SyncConfig,
84    delta_compressor: DeltaCompressor,
85    rate_limiter: Option<RateLimiter>,
86    schema_registry: SchemaRegistry,
87    last_sync: Option<Instant>,
88    sync_count: u64,
89    error_count: u64,
90    reconnect_attempts: u32,
91    schema_version: SchemaVersion,
92}
93
94impl<T: Transport> SyncManager<T> {
95    pub fn new(transport: T, config: SyncConfig) -> Self {
96        let delta_compressor = DeltaCompressor::with_field_compression(config.enable_field_compression);
97        let rate_limiter = if config.enable_rate_limiting {
98            Some(RateLimiter::new(config.rate_limit_config.clone()))
99        } else {
100            None
101        };
102
103        Self {
104            transport,
105            config,
106            delta_compressor,
107            rate_limiter,
108            schema_registry: SchemaRegistry::new(),
109            last_sync: None,
110            sync_count: 0,
111            error_count: 0,
112            reconnect_attempts: 0,
113            schema_version: 1,
114        }
115    }
116
117    pub fn send_snapshot(&mut self, snapshot: WorldSnapshot) -> Result<()> {
118        if !self.transport.is_connected() {
119            if self.config.auto_reconnect && self.reconnect_attempts < self.config.max_reconnect_attempts {
120                self.reconnect_attempts += 1;
121                return Err(LinkError::ConnectionClosed);
122            } else {
123                return Err(LinkError::ConnectionClosed);
124            }
125        }
126
127        let schema_version = self.schema_version;
128        let message = Message::snapshot(
129            snapshot.entities,
130            snapshot.timestamp,
131            schema_version,
132        );
133
134        let estimated_size = 1024u64;
135        if let Some(limiter) = &mut self.rate_limiter {
136            limiter.check_and_record(estimated_size)?;
137        }
138
139        self.transport.send(&message)?;
140
141        self.last_sync = Some(Instant::now());
142        self.sync_count += 1;
143        self.reconnect_attempts = 0;
144
145        Ok(())
146    }
147
148    pub fn send_delta(&mut self, snapshot: WorldSnapshot) -> Result<()> {
149        if !self.transport.is_connected() {
150            if self.config.auto_reconnect && self.reconnect_attempts < self.config.max_reconnect_attempts {
151                self.reconnect_attempts += 1;
152                return Err(LinkError::ConnectionClosed);
153            } else {
154                return Err(LinkError::ConnectionClosed);
155            }
156        }
157
158        let delta = self.delta_compressor.create_delta(snapshot);
159
160        if delta.changes.is_empty() {
161            return Ok(());
162        }
163
164        let base_timestamp = (delta.base_timestamp * 1000.0) as u64;
165        let schema_version = self.schema_version;
166        let message = Message::delta(delta.changes, base_timestamp, schema_version);
167
168        let estimated_size = 1024u64;
169        if let Some(limiter) = &mut self.rate_limiter {
170            limiter.check_and_record(estimated_size)?;
171        }
172
173        self.transport.send(&message)?;
174
175        self.last_sync = Some(Instant::now());
176        self.sync_count += 1;
177        self.reconnect_attempts = 0;
178
179        Ok(())
180    }
181
182    pub fn send(&mut self, snapshot: WorldSnapshot) -> Result<()> {
183        match self.config.mode {
184            SyncMode::Full => self.send_snapshot(snapshot),
185            SyncMode::Delta => self.send_delta(snapshot),
186            SyncMode::Manual => Ok(()),
187        }
188    }
189
190    pub fn receive(&mut self) -> Result<Option<SyncEvent>> {
191        if !self.transport.is_connected() {
192            return Err(LinkError::ConnectionClosed);
193        }
194
195        match self.transport.receive()? {
196            Some(message) => {
197                let event = self.process_message(message)?;
198                Ok(Some(event))
199            }
200            None => Ok(None),
201        }
202    }
203
204    fn process_message(&mut self, message: Message) -> Result<SyncEvent> {
205        match message.payload {
206            MessagePayload::Snapshot(payload) => {
207                let snapshot = WorldSnapshot {
208                    entities: payload.entities,
209                    timestamp: payload.metadata.world_time,
210                    version: "1.0.0".to_string(),
211                };
212
213                self.delta_compressor.reset();
214
215                Ok(SyncEvent::Snapshot(snapshot))
216            }
217            MessagePayload::Delta(payload) => {
218                let delta = Delta {
219                    changes: payload.changes,
220                    timestamp: message.header.timestamp as f64 / 1000.0,
221                    base_timestamp: payload.base_timestamp as f64 / 1000.0,
222                };
223
224                Ok(SyncEvent::Delta(delta))
225            }
226            MessagePayload::RequestSnapshot => {
227                Ok(SyncEvent::SnapshotRequested)
228            }
229            MessagePayload::Ack { ack_id } => {
230                Ok(SyncEvent::Ack(ack_id))
231            }
232            MessagePayload::Ping => {
233                let pong = Message::pong(self.schema_version);
234                self.transport.send(&pong)?;
235                Ok(SyncEvent::Ping)
236            }
237            MessagePayload::Pong => {
238                Ok(SyncEvent::Pong)
239            }
240            MessagePayload::SchemaSync(payload) => {
241                Ok(SyncEvent::SchemaSync(payload.schemas))
242            }
243            MessagePayload::Error { code, message: error_message } => {
244                self.error_count += 1;
245                Ok(SyncEvent::Error { code, message: error_message })
246            }
247        }
248    }
249
250    pub fn request_snapshot(&mut self) -> Result<()> {
251        let message = Message::request_snapshot(self.schema_version);
252        self.transport.send(&message)?;
253        Ok(())
254    }
255
256    pub fn send_ack(&mut self, message_id: u64) -> Result<()> {
257        let message = Message::ack(message_id, self.schema_version);
258        self.transport.send(&message)?;
259        Ok(())
260    }
261
262    pub fn ping(&mut self) -> Result<()> {
263        let message = Message::ping(self.schema_version);
264        self.transport.send(&message)?;
265        Ok(())
266    }
267
268    pub fn should_sync(&self) -> bool {
269        if self.config.mode == SyncMode::Manual {
270            return false;
271        }
272
273        if let Some(last_sync) = self.last_sync {
274            last_sync.elapsed() >= self.config.sync_interval
275        } else {
276            true
277        }
278    }
279
280    pub fn get_stats(&self) -> SyncStats {
281        let rate_limiter_stats = self.rate_limiter.as_ref().map(|l| l.get_stats());
282
283        SyncStats {
284            sync_count: self.sync_count,
285            error_count: self.error_count,
286            last_sync: self.last_sync,
287            rate_limiter_stats,
288            reconnect_attempts: self.reconnect_attempts,
289        }
290    }
291
292    pub fn get_schema_registry(&self) -> &SchemaRegistry {
293        &self.schema_registry
294    }
295
296    pub fn get_schema_registry_mut(&mut self) -> &mut SchemaRegistry {
297        &mut self.schema_registry
298    }
299
300    pub fn set_schema_version(&mut self, version: SchemaVersion) {
301        self.schema_version = version;
302    }
303
304    pub fn get_schema_version(&self) -> SchemaVersion {
305        self.schema_version
306    }
307
308    pub fn reset_delta_compressor(&mut self) {
309        self.delta_compressor.reset();
310    }
311
312    pub fn is_connected(&self) -> bool {
313        self.transport.is_connected()
314    }
315
316    pub fn close(&mut self) -> Result<()> {
317        self.transport.close()
318    }
319
320    fn estimate_message_size(&self, _message: &Message) -> u64 {
321        1024
322    }
323}
324
325#[derive(Debug, Clone)]
326pub struct SyncStats {
327    pub sync_count: u64,
328    pub error_count: u64,
329    pub last_sync: Option<Instant>,
330    pub rate_limiter_stats: Option<crate::rate_limit::RateLimitStats>,
331    pub reconnect_attempts: u32,
332}
333
334#[derive(Debug)]
335pub enum SyncEvent {
336    Snapshot(WorldSnapshot),
337    Delta(Delta),
338    SnapshotRequested,
339    Ack(u64),
340    Ping,
341    Pong,
342    SchemaSync(Vec<ComponentSchemaInfo>),
343    Error { code: u32, message: String },
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349    use crate::transport::MemoryTransport;
350    use crate::serialization::BinaryFormat;
351
352    #[test]
353    fn test_sync_manager_snapshot() {
354        let transport = MemoryTransport::new(BinaryFormat::MessagePack);
355        let config = SyncConfig::new().with_mode(SyncMode::Full);
356        let mut manager = SyncManager::new(transport, config);
357
358        let snapshot = WorldSnapshot {
359            entities: vec![],
360            timestamp: 100.0,
361            version: "1.0.0".to_string(),
362        };
363
364        assert!(manager.send_snapshot(snapshot).is_ok());
365        assert_eq!(manager.get_stats().sync_count, 1);
366    }
367
368    #[test]
369    fn test_sync_manager_delta() {
370        use crate::protocol::{SerializedEntity, SerializedComponent, ComponentData};
371
372        let transport = MemoryTransport::new(BinaryFormat::MessagePack);
373        let config = SyncConfig::new().with_mode(SyncMode::Delta);
374        let mut manager = SyncManager::new(transport, config);
375
376        let snapshot1 = WorldSnapshot {
377            entities: vec![],
378            timestamp: 100.0,
379            version: "1.0.0".to_string(),
380        };
381
382        assert!(manager.send_delta(snapshot1).is_ok());
383
384        let snapshot2 = WorldSnapshot {
385            entities: vec![
386                SerializedEntity {
387                    id: 1,
388                    components: vec![
389                        SerializedComponent {
390                            id: "Position".to_string(),
391                            data: ComponentData::from_json_value(serde_json::json!({"x": 10.0})),
392                        }
393                    ],
394                }
395            ],
396            timestamp: 200.0,
397            version: "1.0.0".to_string(),
398        };
399
400        assert!(manager.send_delta(snapshot2).is_ok());
401        assert_eq!(manager.get_stats().sync_count, 1);
402    }
403
404    #[test]
405    fn test_sync_manager_rate_limiting() {
406        let transport = MemoryTransport::new(BinaryFormat::MessagePack);
407        let rate_config = RateLimitConfig::new().with_max_messages(2);
408        let config = SyncConfig::new()
409            .with_mode(SyncMode::Full)
410            .with_rate_limiting(true)
411            .with_rate_limit_config(rate_config);
412
413        let mut manager = SyncManager::new(transport, config);
414
415        let snapshot = WorldSnapshot {
416            entities: vec![],
417            timestamp: 100.0,
418            version: "1.0.0".to_string(),
419        };
420
421        assert!(manager.send_snapshot(snapshot.clone()).is_ok());
422        assert!(manager.send_snapshot(snapshot.clone()).is_ok());
423        assert!(manager.send_snapshot(snapshot).is_err());
424    }
425}