1use crate::error::{LinkError, Result};
2use crate::protocol::*;
3use crate::serialization::{WorldSnapshot, Delta};
4use crate::transport::Transport;
5use crate::compression::DeltaCompressor;
6use crate::rate_limit::{RateLimiter, RateLimitConfig};
7use crate::schema::{SchemaRegistry, SchemaVersion};
8use std::time::{Duration, Instant};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum SyncMode {
12 Full,
13 Delta,
14 Manual,
15}
16
17#[derive(Debug, Clone)]
18pub struct SyncConfig {
19 pub mode: SyncMode,
20 pub sync_interval: Duration,
21 pub enable_rate_limiting: bool,
22 pub rate_limit_config: RateLimitConfig,
23 pub enable_field_compression: bool,
24 pub auto_reconnect: bool,
25 pub max_reconnect_attempts: u32,
26 pub reconnect_delay: Duration,
27}
28
29impl Default for SyncConfig {
30 fn default() -> Self {
31 Self {
32 mode: SyncMode::Delta,
33 sync_interval: Duration::from_millis(100),
34 enable_rate_limiting: true,
35 rate_limit_config: RateLimitConfig::default(),
36 enable_field_compression: true,
37 auto_reconnect: false,
38 max_reconnect_attempts: 3,
39 reconnect_delay: Duration::from_secs(1),
40 }
41 }
42}
43
44impl SyncConfig {
45 pub fn new() -> Self {
46 Self::default()
47 }
48
49 pub fn with_mode(mut self, mode: SyncMode) -> Self {
50 self.mode = mode;
51 self
52 }
53
54 pub fn with_sync_interval(mut self, interval: Duration) -> Self {
55 self.sync_interval = interval;
56 self
57 }
58
59 pub fn with_rate_limiting(mut self, enabled: bool) -> Self {
60 self.enable_rate_limiting = enabled;
61 self
62 }
63
64 pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
65 self.rate_limit_config = config;
66 self
67 }
68
69 pub fn with_field_compression(mut self, enabled: bool) -> Self {
70 self.enable_field_compression = enabled;
71 self
72 }
73
74 pub fn with_auto_reconnect(mut self, enabled: bool, max_attempts: u32) -> Self {
75 self.auto_reconnect = enabled;
76 self.max_reconnect_attempts = max_attempts;
77 self
78 }
79}
80
81pub struct SyncManager<T: Transport> {
82 transport: T,
83 config: SyncConfig,
84 delta_compressor: DeltaCompressor,
85 rate_limiter: Option<RateLimiter>,
86 schema_registry: SchemaRegistry,
87 last_sync: Option<Instant>,
88 sync_count: u64,
89 error_count: u64,
90 reconnect_attempts: u32,
91 schema_version: SchemaVersion,
92}
93
94impl<T: Transport> SyncManager<T> {
95 pub fn new(transport: T, config: SyncConfig) -> Self {
96 let delta_compressor = DeltaCompressor::with_field_compression(config.enable_field_compression);
97 let rate_limiter = if config.enable_rate_limiting {
98 Some(RateLimiter::new(config.rate_limit_config.clone()))
99 } else {
100 None
101 };
102
103 Self {
104 transport,
105 config,
106 delta_compressor,
107 rate_limiter,
108 schema_registry: SchemaRegistry::new(),
109 last_sync: None,
110 sync_count: 0,
111 error_count: 0,
112 reconnect_attempts: 0,
113 schema_version: 1,
114 }
115 }
116
117 pub fn send_snapshot(&mut self, snapshot: WorldSnapshot) -> Result<()> {
118 if !self.transport.is_connected() {
119 if self.config.auto_reconnect && self.reconnect_attempts < self.config.max_reconnect_attempts {
120 self.reconnect_attempts += 1;
121 return Err(LinkError::ConnectionClosed);
122 } else {
123 return Err(LinkError::ConnectionClosed);
124 }
125 }
126
127 let schema_version = self.schema_version;
128 let message = Message::snapshot(
129 snapshot.entities,
130 snapshot.timestamp,
131 schema_version,
132 );
133
134 let estimated_size = 1024u64;
135 if let Some(limiter) = &mut self.rate_limiter {
136 limiter.check_and_record(estimated_size)?;
137 }
138
139 self.transport.send(&message)?;
140
141 self.last_sync = Some(Instant::now());
142 self.sync_count += 1;
143 self.reconnect_attempts = 0;
144
145 Ok(())
146 }
147
148 pub fn send_delta(&mut self, snapshot: WorldSnapshot) -> Result<()> {
149 if !self.transport.is_connected() {
150 if self.config.auto_reconnect && self.reconnect_attempts < self.config.max_reconnect_attempts {
151 self.reconnect_attempts += 1;
152 return Err(LinkError::ConnectionClosed);
153 } else {
154 return Err(LinkError::ConnectionClosed);
155 }
156 }
157
158 let delta = self.delta_compressor.create_delta(snapshot);
159
160 if delta.changes.is_empty() {
161 return Ok(());
162 }
163
164 let base_timestamp = (delta.base_timestamp * 1000.0) as u64;
165 let schema_version = self.schema_version;
166 let message = Message::delta(delta.changes, base_timestamp, schema_version);
167
168 let estimated_size = 1024u64;
169 if let Some(limiter) = &mut self.rate_limiter {
170 limiter.check_and_record(estimated_size)?;
171 }
172
173 self.transport.send(&message)?;
174
175 self.last_sync = Some(Instant::now());
176 self.sync_count += 1;
177 self.reconnect_attempts = 0;
178
179 Ok(())
180 }
181
182 pub fn send(&mut self, snapshot: WorldSnapshot) -> Result<()> {
183 match self.config.mode {
184 SyncMode::Full => self.send_snapshot(snapshot),
185 SyncMode::Delta => self.send_delta(snapshot),
186 SyncMode::Manual => Ok(()),
187 }
188 }
189
190 pub fn receive(&mut self) -> Result<Option<SyncEvent>> {
191 if !self.transport.is_connected() {
192 return Err(LinkError::ConnectionClosed);
193 }
194
195 match self.transport.receive()? {
196 Some(message) => {
197 let event = self.process_message(message)?;
198 Ok(Some(event))
199 }
200 None => Ok(None),
201 }
202 }
203
204 fn process_message(&mut self, message: Message) -> Result<SyncEvent> {
205 match message.payload {
206 MessagePayload::Snapshot(payload) => {
207 let snapshot = WorldSnapshot {
208 entities: payload.entities,
209 timestamp: payload.metadata.world_time,
210 version: "1.0.0".to_string(),
211 };
212
213 self.delta_compressor.reset();
214
215 Ok(SyncEvent::Snapshot(snapshot))
216 }
217 MessagePayload::Delta(payload) => {
218 let delta = Delta {
219 changes: payload.changes,
220 timestamp: message.header.timestamp as f64 / 1000.0,
221 base_timestamp: payload.base_timestamp as f64 / 1000.0,
222 };
223
224 Ok(SyncEvent::Delta(delta))
225 }
226 MessagePayload::RequestSnapshot => {
227 Ok(SyncEvent::SnapshotRequested)
228 }
229 MessagePayload::Ack { ack_id } => {
230 Ok(SyncEvent::Ack(ack_id))
231 }
232 MessagePayload::Ping => {
233 let pong = Message::pong(self.schema_version);
234 self.transport.send(&pong)?;
235 Ok(SyncEvent::Ping)
236 }
237 MessagePayload::Pong => {
238 Ok(SyncEvent::Pong)
239 }
240 MessagePayload::SchemaSync(payload) => {
241 Ok(SyncEvent::SchemaSync(payload.schemas))
242 }
243 MessagePayload::Error { code, message: error_message } => {
244 self.error_count += 1;
245 Ok(SyncEvent::Error { code, message: error_message })
246 }
247 }
248 }
249
250 pub fn request_snapshot(&mut self) -> Result<()> {
251 let message = Message::request_snapshot(self.schema_version);
252 self.transport.send(&message)?;
253 Ok(())
254 }
255
256 pub fn send_ack(&mut self, message_id: u64) -> Result<()> {
257 let message = Message::ack(message_id, self.schema_version);
258 self.transport.send(&message)?;
259 Ok(())
260 }
261
262 pub fn ping(&mut self) -> Result<()> {
263 let message = Message::ping(self.schema_version);
264 self.transport.send(&message)?;
265 Ok(())
266 }
267
268 pub fn should_sync(&self) -> bool {
269 if self.config.mode == SyncMode::Manual {
270 return false;
271 }
272
273 if let Some(last_sync) = self.last_sync {
274 last_sync.elapsed() >= self.config.sync_interval
275 } else {
276 true
277 }
278 }
279
280 pub fn get_stats(&self) -> SyncStats {
281 let rate_limiter_stats = self.rate_limiter.as_ref().map(|l| l.get_stats());
282
283 SyncStats {
284 sync_count: self.sync_count,
285 error_count: self.error_count,
286 last_sync: self.last_sync,
287 rate_limiter_stats,
288 reconnect_attempts: self.reconnect_attempts,
289 }
290 }
291
292 pub fn get_schema_registry(&self) -> &SchemaRegistry {
293 &self.schema_registry
294 }
295
296 pub fn get_schema_registry_mut(&mut self) -> &mut SchemaRegistry {
297 &mut self.schema_registry
298 }
299
300 pub fn set_schema_version(&mut self, version: SchemaVersion) {
301 self.schema_version = version;
302 }
303
304 pub fn get_schema_version(&self) -> SchemaVersion {
305 self.schema_version
306 }
307
308 pub fn reset_delta_compressor(&mut self) {
309 self.delta_compressor.reset();
310 }
311
312 pub fn is_connected(&self) -> bool {
313 self.transport.is_connected()
314 }
315
316 pub fn close(&mut self) -> Result<()> {
317 self.transport.close()
318 }
319
320 fn estimate_message_size(&self, _message: &Message) -> u64 {
321 1024
322 }
323}
324
325#[derive(Debug, Clone)]
326pub struct SyncStats {
327 pub sync_count: u64,
328 pub error_count: u64,
329 pub last_sync: Option<Instant>,
330 pub rate_limiter_stats: Option<crate::rate_limit::RateLimitStats>,
331 pub reconnect_attempts: u32,
332}
333
334#[derive(Debug)]
335pub enum SyncEvent {
336 Snapshot(WorldSnapshot),
337 Delta(Delta),
338 SnapshotRequested,
339 Ack(u64),
340 Ping,
341 Pong,
342 SchemaSync(Vec<ComponentSchemaInfo>),
343 Error { code: u32, message: String },
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349 use crate::transport::MemoryTransport;
350 use crate::serialization::BinaryFormat;
351
352 #[test]
353 fn test_sync_manager_snapshot() {
354 let transport = MemoryTransport::new(BinaryFormat::MessagePack);
355 let config = SyncConfig::new().with_mode(SyncMode::Full);
356 let mut manager = SyncManager::new(transport, config);
357
358 let snapshot = WorldSnapshot {
359 entities: vec![],
360 timestamp: 100.0,
361 version: "1.0.0".to_string(),
362 };
363
364 assert!(manager.send_snapshot(snapshot).is_ok());
365 assert_eq!(manager.get_stats().sync_count, 1);
366 }
367
368 #[test]
369 fn test_sync_manager_delta() {
370 use crate::protocol::{SerializedEntity, SerializedComponent, ComponentData};
371
372 let transport = MemoryTransport::new(BinaryFormat::MessagePack);
373 let config = SyncConfig::new().with_mode(SyncMode::Delta);
374 let mut manager = SyncManager::new(transport, config);
375
376 let snapshot1 = WorldSnapshot {
377 entities: vec![],
378 timestamp: 100.0,
379 version: "1.0.0".to_string(),
380 };
381
382 assert!(manager.send_delta(snapshot1).is_ok());
383
384 let snapshot2 = WorldSnapshot {
385 entities: vec![
386 SerializedEntity {
387 id: 1,
388 components: vec![
389 SerializedComponent {
390 id: "Position".to_string(),
391 data: ComponentData::from_json_value(serde_json::json!({"x": 10.0})),
392 }
393 ],
394 }
395 ],
396 timestamp: 200.0,
397 version: "1.0.0".to_string(),
398 };
399
400 assert!(manager.send_delta(snapshot2).is_ok());
401 assert_eq!(manager.get_stats().sync_count, 1);
402 }
403
404 #[test]
405 fn test_sync_manager_rate_limiting() {
406 let transport = MemoryTransport::new(BinaryFormat::MessagePack);
407 let rate_config = RateLimitConfig::new().with_max_messages(2);
408 let config = SyncConfig::new()
409 .with_mode(SyncMode::Full)
410 .with_rate_limiting(true)
411 .with_rate_limit_config(rate_config);
412
413 let mut manager = SyncManager::new(transport, config);
414
415 let snapshot = WorldSnapshot {
416 entities: vec![],
417 timestamp: 100.0,
418 version: "1.0.0".to_string(),
419 };
420
421 assert!(manager.send_snapshot(snapshot.clone()).is_ok());
422 assert!(manager.send_snapshot(snapshot.clone()).is_ok());
423 assert!(manager.send_snapshot(snapshot).is_err());
424 }
425}