1use super::{PeerInfo, PeerSyncStatus, SyncEngine, SyncEngineError, SyncState};
4use crate::{
5 crdt::{LwwMap, LwwRegister, Mergeable, ReplicaId},
6 storage::{LocalStorage, StorageError},
7 transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use thiserror::Error;
14use tokio::sync::{mpsc, RwLock};
15use tokio::time::{interval, sleep};
16
17#[derive(Error, Debug)]
18pub enum EndToEndSyncError {
19 #[error("Storage error: {0}")]
20 Storage(#[from] StorageError),
21 #[error("Transport error: {0}")]
22 Transport(#[from] TransportError),
23 #[error("Sync engine error: {0}")]
24 SyncEngine(#[from] SyncEngineError),
25 #[error("Serialization error: {0}")]
26 Serialization(#[from] serde_json::Error),
27 #[error("Sync operation failed: {0}")]
28 SyncFailed(String),
29 #[error("Peer not found: {0}")]
30 PeerNotFound(String),
31 #[error("Collection not found: {0}")]
32 CollectionNotFound(String),
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub enum SyncMessage {
38 SyncRequest {
40 collection_id: String,
41 replica_id: ReplicaId,
42 data: Vec<u8>,
43 timestamp: u64,
44 },
45 SyncResponse {
47 collection_id: String,
48 replica_id: ReplicaId,
49 data: Vec<u8>,
50 timestamp: u64,
51 },
52 Presence {
54 replica_id: ReplicaId,
55 status: String,
56 timestamp: u64,
57 },
58 Heartbeat {
60 replica_id: ReplicaId,
61 timestamp: u64,
62 },
63 Ack {
65 message_id: String,
66 replica_id: ReplicaId,
67 timestamp: u64,
68 },
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct CollectionMetadata {
74 pub id: String,
75 pub name: String,
76 pub crdt_type: String,
77 pub version: u32,
78 pub last_sync: u64,
79 pub replica_count: u32,
80}
81
82pub struct EndToEndSyncManager<S, T>
84where
85 S: LocalStorage + Send + Sync + 'static,
86 T: SyncTransport + Send + Sync + 'static,
87{
88 replica_id: ReplicaId,
89 storage: Arc<S>,
90 transport: Arc<T>,
91 collections: Arc<RwLock<HashMap<String, CollectionMetadata>>>,
92 peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
93 sync_state: Arc<RwLock<SyncState>>,
94 message_sender: mpsc::UnboundedSender<SyncMessage>,
95 message_receiver: Arc<RwLock<mpsc::UnboundedReceiver<SyncMessage>>>,
96 sync_interval: Duration,
97 heartbeat_interval: Duration,
98 is_running: Arc<RwLock<bool>>,
99}
100
101impl<S, T> EndToEndSyncManager<S, T>
102where
103 S: LocalStorage + Send + Sync + 'static,
104 T: SyncTransport + Send + Sync + 'static,
105 EndToEndSyncError: From<<T as SyncTransport>::Error>,
106{
107 pub fn new(
109 replica_id: ReplicaId,
110 storage: Arc<S>,
111 transport: Arc<T>,
112 sync_interval: Duration,
113 heartbeat_interval: Duration,
114 ) -> Self {
115 let (tx, rx) = mpsc::unbounded_channel();
116
117 Self {
118 replica_id,
119 storage,
120 transport,
121 collections: Arc::new(RwLock::new(HashMap::new())),
122 peers: Arc::new(RwLock::new(HashMap::new())),
123 sync_state: Arc::new(RwLock::new(SyncState::Disconnected)),
124 message_sender: tx,
125 message_receiver: Arc::new(RwLock::new(rx)),
126 sync_interval,
127 heartbeat_interval,
128 is_running: Arc::new(RwLock::new(false)),
129 }
130 }
131
132 pub async fn start(&self) -> Result<(), EndToEndSyncError> {
134 let mut is_running = self.is_running.write().await;
135 if *is_running {
136 return Ok(());
137 }
138 *is_running = true;
139 drop(is_running);
140
141 if !self.transport.is_connected() {
143 }
146
147 {
149 let mut state = self.sync_state.write().await;
150 *state = SyncState::Connected;
151 }
152
153 self.start_background_tasks().await;
155
156 Ok(())
157 }
158
159 pub async fn stop(&self) -> Result<(), EndToEndSyncError> {
161 let mut is_running = self.is_running.write().await;
162 *is_running = false;
163 drop(is_running);
164
165 {
167 let mut state = self.sync_state.write().await;
168 *state = SyncState::Disconnected;
169 }
170
171 Ok(())
172 }
173
174 async fn start_background_tasks(&self) {
176 let sync_manager = self.clone_for_background();
177 let heartbeat_manager = self.clone_for_background();
178 let message_handler = self.clone_for_background();
179
180 tokio::spawn(async move {
182 sync_manager.sync_task().await;
183 });
184
185 tokio::spawn(async move {
187 heartbeat_manager.heartbeat_task().await;
188 });
189
190 tokio::spawn(async move {
192 message_handler.message_handler_task().await;
193 });
194 }
195
196 async fn sync_task(&self) {
198 let mut interval = interval(self.sync_interval);
199
200 loop {
201 interval.tick().await;
202
203 let is_running = *self.is_running.read().await;
204 if !is_running {
205 break;
206 }
207
208 if let Err(e) = self.perform_sync().await {
209 tracing::error!("Sync task error: {:?}", e);
210 }
211 }
212 }
213
214 async fn heartbeat_task(&self) {
216 let mut interval = interval(self.heartbeat_interval);
217
218 loop {
219 interval.tick().await;
220
221 let is_running = *self.is_running.read().await;
222 if !is_running {
223 break;
224 }
225
226 if let Err(e) = self.send_heartbeat().await {
227 tracing::error!("Heartbeat task error: {:?}", e);
228 }
229 }
230 }
231
232 async fn message_handler_task(&self) {
234 let mut receiver = self.message_receiver.write().await;
235
236 while let Some(message) = receiver.recv().await {
237 if let Err(e) = self.handle_message(message).await {
238 tracing::error!("Message handler error: {:?}", e);
239 }
240 }
241 }
242
243 async fn perform_sync(&self) -> Result<(), EndToEndSyncError> {
245 let collections = self.collections.read().await;
246 let peers = self.peers.read().await;
247
248 for (collection_id, metadata) in collections.iter() {
249 for (peer_id, peer_info) in peers.iter() {
250 if peer_info.sync_status == PeerSyncStatus::Connected {
251 if let Err(e) = self.sync_with_peer(collection_id, peer_id).await {
252 tracing::warn!(
253 "Failed to sync collection {} with peer {}: {:?}",
254 collection_id,
255 peer_id,
256 e
257 );
258 }
259 }
260 }
261 }
262
263 Ok(())
264 }
265
266 async fn sync_with_peer(
268 &self,
269 collection_id: &str,
270 peer_id: &ReplicaId,
271 ) -> Result<(), EndToEndSyncError> {
272 let local_data = self.storage.get::<Vec<u8>>(collection_id).await?;
274
275 if let Some(data) = local_data {
276 let message = SyncMessage::SyncRequest {
278 collection_id: collection_id.to_string(),
279 replica_id: self.replica_id.clone(),
280 data,
281 timestamp: SystemTime::now()
282 .duration_since(UNIX_EPOCH)
283 .unwrap_or_default()
284 .as_millis() as u64,
285 };
286
287 self.send_message(message).await?;
289 }
290
291 Ok(())
292 }
293
294 async fn send_heartbeat(&self) -> Result<(), EndToEndSyncError> {
296 let message = SyncMessage::Heartbeat {
297 replica_id: self.replica_id.clone(),
298 timestamp: SystemTime::now()
299 .duration_since(UNIX_EPOCH)
300 .unwrap_or_default()
301 .as_millis() as u64,
302 };
303
304 self.send_message(message).await
305 }
306
307 async fn send_message(&self, message: SyncMessage) -> Result<(), EndToEndSyncError> {
309 let serialized = serde_json::to_vec(&message)?;
310 self.transport.send(&serialized).await.map_err(EndToEndSyncError::from)?;
311 Ok(())
312 }
313
314 async fn handle_message(&self, message: SyncMessage) -> Result<(), EndToEndSyncError> {
316 match message {
317 SyncMessage::SyncRequest {
318 collection_id,
319 replica_id,
320 data,
321 timestamp,
322 } => {
323 self.handle_sync_request(collection_id, replica_id, data, timestamp)
324 .await
325 }
326 SyncMessage::SyncResponse {
327 collection_id,
328 replica_id,
329 data,
330 timestamp,
331 } => {
332 self.handle_sync_response(collection_id, replica_id, data, timestamp)
333 .await
334 }
335 SyncMessage::Presence {
336 replica_id,
337 status,
338 timestamp,
339 } => self.handle_presence(replica_id, status, timestamp).await,
340 SyncMessage::Heartbeat {
341 replica_id,
342 timestamp,
343 } => self.handle_heartbeat(replica_id, timestamp).await,
344 SyncMessage::Ack {
345 message_id,
346 replica_id,
347 timestamp,
348 } => self.handle_ack(message_id, replica_id, timestamp).await,
349 }
350 }
351
352 async fn handle_sync_request(
354 &self,
355 collection_id: String,
356 replica_id: ReplicaId,
357 data: Vec<u8>,
358 timestamp: u64,
359 ) -> Result<(), EndToEndSyncError> {
360 let local_data = self.storage.get::<Vec<u8>>(&collection_id).await?;
362
363 let merged_data = if let Some(local) = local_data {
365 if timestamp
367 > SystemTime::now()
368 .duration_since(UNIX_EPOCH)
369 .unwrap_or_default()
370 .as_millis() as u64
371 - 1000
372 {
373 data } else {
375 local }
377 } else {
378 data };
380
381 self.storage.set(&collection_id, &merged_data).await?;
383
384 let response = SyncMessage::SyncResponse {
386 collection_id,
387 replica_id: self.replica_id.clone(),
388 data: merged_data,
389 timestamp: SystemTime::now()
390 .duration_since(UNIX_EPOCH)
391 .unwrap_or_default()
392 .as_millis() as u64,
393 };
394
395 self.send_message(response).await
396 }
397
398 async fn handle_sync_response(
400 &self,
401 collection_id: String,
402 replica_id: ReplicaId,
403 data: Vec<u8>,
404 _timestamp: u64,
405 ) -> Result<(), EndToEndSyncError> {
406 self.storage.set(&collection_id, &data).await?;
408
409 {
411 let mut peers = self.peers.write().await;
412 if let Some(peer) = peers.get_mut(&replica_id) {
413 peer.last_sync = Some(chrono::Utc::now());
414 }
415 }
416
417 Ok(())
418 }
419
420 async fn handle_presence(
422 &self,
423 replica_id: ReplicaId,
424 status: String,
425 timestamp: u64,
426 ) -> Result<(), EndToEndSyncError> {
427 let mut peers = self.peers.write().await;
428
429 let peer_info = PeerInfo {
430 replica_id: replica_id.clone(),
431 last_seen: chrono::Utc::now(),
432 is_online: status == "connected",
433 last_sync: None,
434 sync_status: if status == "connected" {
435 PeerSyncStatus::Connected
436 } else {
437 PeerSyncStatus::Disconnected
438 },
439 id: replica_id.clone(),
441 status: if status == "connected" {
442 PeerSyncStatus::Connected
443 } else {
444 PeerSyncStatus::Disconnected
445 },
446 version: 1,
447 };
448
449 peers.insert(replica_id, peer_info);
450 Ok(())
451 }
452
453 async fn handle_heartbeat(
455 &self,
456 replica_id: ReplicaId,
457 timestamp: u64,
458 ) -> Result<(), EndToEndSyncError> {
459 let mut peers = self.peers.write().await;
460
461 if let Some(peer) = peers.get_mut(&replica_id) {
462 peer.last_seen = chrono::Utc::now();
463 }
464
465 Ok(())
466 }
467
468 async fn handle_ack(
470 &self,
471 _message_id: String,
472 _replica_id: ReplicaId,
473 _timestamp: u64,
474 ) -> Result<(), EndToEndSyncError> {
475 Ok(())
477 }
478
479 pub async fn add_collection(
481 &self,
482 metadata: CollectionMetadata,
483 ) -> Result<(), EndToEndSyncError> {
484 let mut collections = self.collections.write().await;
485 collections.insert(metadata.id.clone(), metadata);
486 Ok(())
487 }
488
489 pub async fn remove_collection(&self, collection_id: &str) -> Result<(), EndToEndSyncError> {
491 let mut collections = self.collections.write().await;
492 collections.remove(collection_id);
493 Ok(())
494 }
495
496 pub async fn get_collection(
498 &self,
499 collection_id: &str,
500 ) -> Result<Option<CollectionMetadata>, EndToEndSyncError> {
501 let collections = self.collections.read().await;
502 Ok(collections.get(collection_id).cloned())
503 }
504
505 pub async fn list_collections(&self) -> Result<Vec<CollectionMetadata>, EndToEndSyncError> {
507 let collections = self.collections.read().await;
508 Ok(collections.values().cloned().collect())
509 }
510
511 pub async fn get_peer(
513 &self,
514 peer_id: &ReplicaId,
515 ) -> Result<Option<PeerInfo>, EndToEndSyncError> {
516 let peers = self.peers.read().await;
517 Ok(peers.get(peer_id).cloned())
518 }
519
520 pub async fn list_peers(&self) -> Result<Vec<PeerInfo>, EndToEndSyncError> {
522 let peers = self.peers.read().await;
523 Ok(peers.values().cloned().collect())
524 }
525
526 pub async fn get_sync_state(&self) -> SyncState {
528 let state = self.sync_state.read().await;
529 state.clone()
530 }
531
532 pub async fn is_running(&self) -> bool {
534 let is_running = self.is_running.read().await;
535 *is_running
536 }
537
538 fn clone_for_background(&self) -> EndToEndSyncManager<S, T> {
540 EndToEndSyncManager {
541 replica_id: self.replica_id.clone(),
542 storage: self.storage.clone(),
543 transport: self.transport.clone(),
544 collections: self.collections.clone(),
545 peers: self.peers.clone(),
546 sync_state: self.sync_state.clone(),
547 message_sender: self.message_sender.clone(),
548 message_receiver: self.message_receiver.clone(),
549 sync_interval: self.sync_interval,
550 heartbeat_interval: self.heartbeat_interval,
551 is_running: self.is_running.clone(),
552 }
553 }
554}
555
556impl<S, T> Clone for EndToEndSyncManager<S, T>
557where
558 S: LocalStorage + Send + Sync + 'static,
559 T: SyncTransport + Send + Sync + 'static,
560 EndToEndSyncError: From<<T as SyncTransport>::Error>,
561{
562 fn clone(&self) -> Self {
563 self.clone_for_background()
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570 use crate::storage::memory::MemoryStorage;
571 use crate::transport::memory::InMemoryTransport;
572
573 #[tokio::test]
574 async fn test_end_to_end_sync_manager_creation() {
575 let storage = Arc::new(MemoryStorage::new());
576 let transport = Arc::new(InMemoryTransport::new());
577 let replica_id = ReplicaId::default();
578
579 let manager = EndToEndSyncManager::new(
580 replica_id,
581 storage,
582 transport,
583 Duration::from_secs(5),
584 Duration::from_secs(30),
585 );
586
587 assert!(!manager.is_running().await);
588 assert_eq!(manager.get_sync_state().await, SyncState::Disconnected);
589 }
590
591 #[tokio::test]
592 async fn test_collection_management() {
593 let storage = Arc::new(MemoryStorage::new());
594 let transport = Arc::new(InMemoryTransport::new());
595 let replica_id = ReplicaId::default();
596
597 let manager = EndToEndSyncManager::new(
598 replica_id,
599 storage,
600 transport,
601 Duration::from_secs(5),
602 Duration::from_secs(30),
603 );
604
605 let metadata = CollectionMetadata {
606 id: "test_collection".to_string(),
607 name: "Test Collection".to_string(),
608 crdt_type: "LwwMap".to_string(),
609 version: 1,
610 last_sync: 0,
611 replica_count: 1,
612 };
613
614 assert!(manager.add_collection(metadata.clone()).await.is_ok());
616
617 let retrieved = manager.get_collection("test_collection").await.unwrap();
619 assert!(retrieved.is_some());
620 assert_eq!(retrieved.unwrap().id, "test_collection");
621
622 let collections = manager.list_collections().await.unwrap();
624 assert_eq!(collections.len(), 1);
625 assert_eq!(collections[0].id, "test_collection");
626
627 assert!(manager.remove_collection("test_collection").await.is_ok());
629
630 let collections = manager.list_collections().await.unwrap();
631 assert_eq!(collections.len(), 0);
632 }
633
634 #[tokio::test]
635 async fn test_sync_message_serialization() {
636 let message = SyncMessage::SyncRequest {
637 collection_id: "test_collection".to_string(),
638 replica_id: ReplicaId::default(),
639 data: b"test data".to_vec(),
640 timestamp: 1234567890,
641 };
642
643 let serialized = serde_json::to_string(&message).unwrap();
644 let deserialized: SyncMessage = serde_json::from_str(&serialized).unwrap();
645
646 match deserialized {
647 SyncMessage::SyncRequest {
648 collection_id,
649 replica_id,
650 data,
651 timestamp,
652 } => {
653 assert_eq!(collection_id, "test_collection");
654 assert_eq!(replica_id.0, replica_id.0); assert_eq!(data, b"test data");
656 assert_eq!(timestamp, 1234567890);
657 }
658 _ => panic!("Unexpected message type"),
659 }
660 }
661}