leptos_sync_core/sync/
end_to_end.rs

1//! End-to-end synchronization engine
2
3use super::{PeerInfo, PeerSyncStatus, SyncEngine, SyncEngineError, SyncState};
4use crate::{
5    crdt::{LwwMap, LwwRegister, Mergeable, ReplicaId},
6    storage::{LocalStorage, StorageError},
7    transport::{SyncTransport, TransportError},
8};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13use thiserror::Error;
14use tokio::sync::{mpsc, RwLock};
15use tokio::time::{interval, sleep};
16
17#[derive(Error, Debug)]
18pub enum EndToEndSyncError {
19    #[error("Storage error: {0}")]
20    Storage(#[from] StorageError),
21    #[error("Transport error: {0}")]
22    Transport(#[from] TransportError),
23    #[error("Sync engine error: {0}")]
24    SyncEngine(#[from] SyncEngineError),
25    #[error("Serialization error: {0}")]
26    Serialization(#[from] serde_json::Error),
27    #[error("Sync operation failed: {0}")]
28    SyncFailed(String),
29    #[error("Peer not found: {0}")]
30    PeerNotFound(String),
31    #[error("Collection not found: {0}")]
32    CollectionNotFound(String),
33}
34
35/// End-to-end synchronization message
36#[derive(Debug, Clone, Serialize, Deserialize)]
37pub enum SyncMessage {
38    /// Sync request with data
39    SyncRequest {
40        collection_id: String,
41        replica_id: ReplicaId,
42        data: Vec<u8>,
43        timestamp: u64,
44    },
45    /// Sync response with merged data
46    SyncResponse {
47        collection_id: String,
48        replica_id: ReplicaId,
49        data: Vec<u8>,
50        timestamp: u64,
51    },
52    /// Peer presence announcement
53    Presence {
54        replica_id: ReplicaId,
55        status: String,
56        timestamp: u64,
57    },
58    /// Heartbeat message
59    Heartbeat {
60        replica_id: ReplicaId,
61        timestamp: u64,
62    },
63    /// Acknowledgment
64    Ack {
65        message_id: String,
66        replica_id: ReplicaId,
67        timestamp: u64,
68    },
69}
70
71/// Collection metadata for synchronization
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct CollectionMetadata {
74    pub id: String,
75    pub name: String,
76    pub crdt_type: String,
77    pub version: u32,
78    pub last_sync: u64,
79    pub replica_count: u32,
80}
81
82/// End-to-end synchronization manager
83pub struct EndToEndSyncManager<S, T>
84where
85    S: LocalStorage + Send + Sync + 'static,
86    T: SyncTransport + Send + Sync + 'static,
87{
88    replica_id: ReplicaId,
89    storage: Arc<S>,
90    transport: Arc<T>,
91    collections: Arc<RwLock<HashMap<String, CollectionMetadata>>>,
92    peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
93    sync_state: Arc<RwLock<SyncState>>,
94    message_sender: mpsc::UnboundedSender<SyncMessage>,
95    message_receiver: Arc<RwLock<mpsc::UnboundedReceiver<SyncMessage>>>,
96    sync_interval: Duration,
97    heartbeat_interval: Duration,
98    is_running: Arc<RwLock<bool>>,
99}
100
101impl<S, T> EndToEndSyncManager<S, T>
102where
103    S: LocalStorage + Send + Sync + 'static,
104    T: SyncTransport + Send + Sync + 'static,
105    EndToEndSyncError: From<<T as SyncTransport>::Error>,
106{
107    /// Create a new end-to-end sync manager
108    pub fn new(
109        replica_id: ReplicaId,
110        storage: Arc<S>,
111        transport: Arc<T>,
112        sync_interval: Duration,
113        heartbeat_interval: Duration,
114    ) -> Self {
115        let (tx, rx) = mpsc::unbounded_channel();
116
117        Self {
118            replica_id,
119            storage,
120            transport,
121            collections: Arc::new(RwLock::new(HashMap::new())),
122            peers: Arc::new(RwLock::new(HashMap::new())),
123            sync_state: Arc::new(RwLock::new(SyncState::Disconnected)),
124            message_sender: tx,
125            message_receiver: Arc::new(RwLock::new(rx)),
126            sync_interval,
127            heartbeat_interval,
128            is_running: Arc::new(RwLock::new(false)),
129        }
130    }
131
132    /// Start the synchronization manager
133    pub async fn start(&self) -> Result<(), EndToEndSyncError> {
134        let mut is_running = self.is_running.write().await;
135        if *is_running {
136            return Ok(());
137        }
138        *is_running = true;
139        drop(is_running);
140
141        // Connect transport
142        if !self.transport.is_connected() {
143            // Note: In a real implementation, we'd call transport.connect()
144            // For now, we'll assume the transport is already connected
145        }
146
147        // Update sync state
148        {
149            let mut state = self.sync_state.write().await;
150            *state = SyncState::Connected;
151        }
152
153        // Start background tasks
154        self.start_background_tasks().await;
155
156        Ok(())
157    }
158
159    /// Stop the synchronization manager
160    pub async fn stop(&self) -> Result<(), EndToEndSyncError> {
161        let mut is_running = self.is_running.write().await;
162        *is_running = false;
163        drop(is_running);
164
165        // Update sync state
166        {
167            let mut state = self.sync_state.write().await;
168            *state = SyncState::Disconnected;
169        }
170
171        Ok(())
172    }
173
174    /// Start background synchronization tasks
175    async fn start_background_tasks(&self) {
176        let sync_manager = self.clone_for_background();
177        let heartbeat_manager = self.clone_for_background();
178        let message_handler = self.clone_for_background();
179
180        // Start sync task
181        tokio::spawn(async move {
182            sync_manager.sync_task().await;
183        });
184
185        // Start heartbeat task
186        tokio::spawn(async move {
187            heartbeat_manager.heartbeat_task().await;
188        });
189
190        // Start message handler task
191        tokio::spawn(async move {
192            message_handler.message_handler_task().await;
193        });
194    }
195
196    /// Background sync task
197    async fn sync_task(&self) {
198        let mut interval = interval(self.sync_interval);
199
200        loop {
201            interval.tick().await;
202
203            let is_running = *self.is_running.read().await;
204            if !is_running {
205                break;
206            }
207
208            if let Err(e) = self.perform_sync().await {
209                tracing::error!("Sync task error: {:?}", e);
210            }
211        }
212    }
213
214    /// Background heartbeat task
215    async fn heartbeat_task(&self) {
216        let mut interval = interval(self.heartbeat_interval);
217
218        loop {
219            interval.tick().await;
220
221            let is_running = *self.is_running.read().await;
222            if !is_running {
223                break;
224            }
225
226            if let Err(e) = self.send_heartbeat().await {
227                tracing::error!("Heartbeat task error: {:?}", e);
228            }
229        }
230    }
231
232    /// Background message handler task
233    async fn message_handler_task(&self) {
234        let mut receiver = self.message_receiver.write().await;
235
236        while let Some(message) = receiver.recv().await {
237            if let Err(e) = self.handle_message(message).await {
238                tracing::error!("Message handler error: {:?}", e);
239            }
240        }
241    }
242
243    /// Perform synchronization with all peers
244    async fn perform_sync(&self) -> Result<(), EndToEndSyncError> {
245        let collections = self.collections.read().await;
246        let peers = self.peers.read().await;
247
248        for (collection_id, metadata) in collections.iter() {
249            for (peer_id, peer_info) in peers.iter() {
250                if peer_info.sync_status == PeerSyncStatus::Connected {
251                    if let Err(e) = self.sync_with_peer(collection_id, peer_id).await {
252                        tracing::warn!(
253                            "Failed to sync collection {} with peer {}: {:?}",
254                            collection_id,
255                            peer_id,
256                            e
257                        );
258                    }
259                }
260            }
261        }
262
263        Ok(())
264    }
265
266    /// Sync a specific collection with a specific peer
267    async fn sync_with_peer(
268        &self,
269        collection_id: &str,
270        peer_id: &ReplicaId,
271    ) -> Result<(), EndToEndSyncError> {
272        // Get local data
273        let local_data = self.storage.get::<Vec<u8>>(collection_id).await?;
274
275        if let Some(data) = local_data {
276            // Create sync request
277            let message = SyncMessage::SyncRequest {
278                collection_id: collection_id.to_string(),
279                replica_id: self.replica_id.clone(),
280                data,
281                timestamp: SystemTime::now()
282                    .duration_since(UNIX_EPOCH)
283                    .unwrap_or_default()
284                    .as_millis() as u64,
285            };
286
287            // Send sync request
288            self.send_message(message).await?;
289        }
290
291        Ok(())
292    }
293
294    /// Send a heartbeat message
295    async fn send_heartbeat(&self) -> Result<(), EndToEndSyncError> {
296        let message = SyncMessage::Heartbeat {
297            replica_id: self.replica_id.clone(),
298            timestamp: SystemTime::now()
299                .duration_since(UNIX_EPOCH)
300                .unwrap_or_default()
301                .as_millis() as u64,
302        };
303
304        self.send_message(message).await
305    }
306
307    /// Send a message via transport
308    async fn send_message(&self, message: SyncMessage) -> Result<(), EndToEndSyncError> {
309        let serialized = serde_json::to_vec(&message)?;
310        self.transport.send(&serialized).await.map_err(EndToEndSyncError::from)?;
311        Ok(())
312    }
313
314    /// Handle incoming messages
315    async fn handle_message(&self, message: SyncMessage) -> Result<(), EndToEndSyncError> {
316        match message {
317            SyncMessage::SyncRequest {
318                collection_id,
319                replica_id,
320                data,
321                timestamp,
322            } => {
323                self.handle_sync_request(collection_id, replica_id, data, timestamp)
324                    .await
325            }
326            SyncMessage::SyncResponse {
327                collection_id,
328                replica_id,
329                data,
330                timestamp,
331            } => {
332                self.handle_sync_response(collection_id, replica_id, data, timestamp)
333                    .await
334            }
335            SyncMessage::Presence {
336                replica_id,
337                status,
338                timestamp,
339            } => self.handle_presence(replica_id, status, timestamp).await,
340            SyncMessage::Heartbeat {
341                replica_id,
342                timestamp,
343            } => self.handle_heartbeat(replica_id, timestamp).await,
344            SyncMessage::Ack {
345                message_id,
346                replica_id,
347                timestamp,
348            } => self.handle_ack(message_id, replica_id, timestamp).await,
349        }
350    }
351
352    /// Handle sync request from peer
353    async fn handle_sync_request(
354        &self,
355        collection_id: String,
356        replica_id: ReplicaId,
357        data: Vec<u8>,
358        timestamp: u64,
359    ) -> Result<(), EndToEndSyncError> {
360        // Get local data
361        let local_data = self.storage.get::<Vec<u8>>(&collection_id).await?;
362
363        // Merge data (simplified - in real implementation, use proper CRDT merge)
364        let merged_data = if let Some(local) = local_data {
365            // Simple merge strategy - in real implementation, use proper CRDT merge
366            if timestamp
367                > SystemTime::now()
368                    .duration_since(UNIX_EPOCH)
369                    .unwrap_or_default()
370                    .as_millis() as u64
371                    - 1000
372            {
373                data // Use remote data if it's newer
374            } else {
375                local // Use local data if it's newer
376            }
377        } else {
378            data // Use remote data if no local data
379        };
380
381        // Store merged data
382        self.storage.set(&collection_id, &merged_data).await?;
383
384        // Send sync response
385        let response = SyncMessage::SyncResponse {
386            collection_id,
387            replica_id: self.replica_id.clone(),
388            data: merged_data,
389            timestamp: SystemTime::now()
390                .duration_since(UNIX_EPOCH)
391                .unwrap_or_default()
392                .as_millis() as u64,
393        };
394
395        self.send_message(response).await
396    }
397
398    /// Handle sync response from peer
399    async fn handle_sync_response(
400        &self,
401        collection_id: String,
402        replica_id: ReplicaId,
403        data: Vec<u8>,
404        _timestamp: u64,
405    ) -> Result<(), EndToEndSyncError> {
406        // Store the merged data
407        self.storage.set(&collection_id, &data).await?;
408
409        // Update peer info
410        {
411            let mut peers = self.peers.write().await;
412            if let Some(peer) = peers.get_mut(&replica_id) {
413                  peer.last_sync = Some(chrono::Utc::now());
414            }
415        }
416
417        Ok(())
418    }
419
420    /// Handle presence message
421    async fn handle_presence(
422        &self,
423        replica_id: ReplicaId,
424        status: String,
425        timestamp: u64,
426    ) -> Result<(), EndToEndSyncError> {
427        let mut peers = self.peers.write().await;
428
429        let peer_info = PeerInfo {
430            replica_id: replica_id.clone(),
431            last_seen: chrono::Utc::now(),
432            is_online: status == "connected",
433            last_sync: None,
434            sync_status: if status == "connected" {
435                PeerSyncStatus::Connected
436            } else {
437                PeerSyncStatus::Disconnected
438            },
439            // Additional fields for compatibility
440            id: replica_id.clone(),
441            status: if status == "connected" {
442                PeerSyncStatus::Connected
443            } else {
444                PeerSyncStatus::Disconnected
445            },
446            version: 1,
447        };
448
449        peers.insert(replica_id, peer_info);
450        Ok(())
451    }
452
453    /// Handle heartbeat message
454    async fn handle_heartbeat(
455        &self,
456        replica_id: ReplicaId,
457        timestamp: u64,
458    ) -> Result<(), EndToEndSyncError> {
459        let mut peers = self.peers.write().await;
460
461        if let Some(peer) = peers.get_mut(&replica_id) {
462            peer.last_seen = chrono::Utc::now();
463        }
464
465        Ok(())
466    }
467
468    /// Handle acknowledgment message
469    async fn handle_ack(
470        &self,
471        _message_id: String,
472        _replica_id: ReplicaId,
473        _timestamp: u64,
474    ) -> Result<(), EndToEndSyncError> {
475        // Handle acknowledgment - in a real implementation, this would update message tracking
476        Ok(())
477    }
478
479    /// Add a collection to synchronize
480    pub async fn add_collection(
481        &self,
482        metadata: CollectionMetadata,
483    ) -> Result<(), EndToEndSyncError> {
484        let mut collections = self.collections.write().await;
485        collections.insert(metadata.id.clone(), metadata);
486        Ok(())
487    }
488
489    /// Remove a collection from synchronization
490    pub async fn remove_collection(&self, collection_id: &str) -> Result<(), EndToEndSyncError> {
491        let mut collections = self.collections.write().await;
492        collections.remove(collection_id);
493        Ok(())
494    }
495
496    /// Get collection metadata
497    pub async fn get_collection(
498        &self,
499        collection_id: &str,
500    ) -> Result<Option<CollectionMetadata>, EndToEndSyncError> {
501        let collections = self.collections.read().await;
502        Ok(collections.get(collection_id).cloned())
503    }
504
505    /// List all collections
506    pub async fn list_collections(&self) -> Result<Vec<CollectionMetadata>, EndToEndSyncError> {
507        let collections = self.collections.read().await;
508        Ok(collections.values().cloned().collect())
509    }
510
511    /// Get peer information
512    pub async fn get_peer(
513        &self,
514        peer_id: &ReplicaId,
515    ) -> Result<Option<PeerInfo>, EndToEndSyncError> {
516        let peers = self.peers.read().await;
517        Ok(peers.get(peer_id).cloned())
518    }
519
520    /// List all peers
521    pub async fn list_peers(&self) -> Result<Vec<PeerInfo>, EndToEndSyncError> {
522        let peers = self.peers.read().await;
523        Ok(peers.values().cloned().collect())
524    }
525
526    /// Get current sync state
527    pub async fn get_sync_state(&self) -> SyncState {
528        let state = self.sync_state.read().await;
529        state.clone()
530    }
531
532    /// Check if the manager is running
533    pub async fn is_running(&self) -> bool {
534        let is_running = self.is_running.read().await;
535        *is_running
536    }
537
538    /// Clone the manager for background tasks
539    fn clone_for_background(&self) -> EndToEndSyncManager<S, T> {
540        EndToEndSyncManager {
541            replica_id: self.replica_id.clone(),
542            storage: self.storage.clone(),
543            transport: self.transport.clone(),
544            collections: self.collections.clone(),
545            peers: self.peers.clone(),
546            sync_state: self.sync_state.clone(),
547            message_sender: self.message_sender.clone(),
548            message_receiver: self.message_receiver.clone(),
549            sync_interval: self.sync_interval,
550            heartbeat_interval: self.heartbeat_interval,
551            is_running: self.is_running.clone(),
552        }
553    }
554}
555
556impl<S, T> Clone for EndToEndSyncManager<S, T>
557where
558    S: LocalStorage + Send + Sync + 'static,
559    T: SyncTransport + Send + Sync + 'static,
560    EndToEndSyncError: From<<T as SyncTransport>::Error>,
561{
562    fn clone(&self) -> Self {
563        self.clone_for_background()
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570    use crate::storage::memory::MemoryStorage;
571    use crate::transport::memory::InMemoryTransport;
572
573    #[tokio::test]
574    async fn test_end_to_end_sync_manager_creation() {
575        let storage = Arc::new(MemoryStorage::new());
576        let transport = Arc::new(InMemoryTransport::new());
577        let replica_id = ReplicaId::default();
578
579        let manager = EndToEndSyncManager::new(
580            replica_id,
581            storage,
582            transport,
583            Duration::from_secs(5),
584            Duration::from_secs(30),
585        );
586
587        assert!(!manager.is_running().await);
588        assert_eq!(manager.get_sync_state().await, SyncState::Disconnected);
589    }
590
591    #[tokio::test]
592    async fn test_collection_management() {
593        let storage = Arc::new(MemoryStorage::new());
594        let transport = Arc::new(InMemoryTransport::new());
595        let replica_id = ReplicaId::default();
596
597        let manager = EndToEndSyncManager::new(
598            replica_id,
599            storage,
600            transport,
601            Duration::from_secs(5),
602            Duration::from_secs(30),
603        );
604
605        let metadata = CollectionMetadata {
606            id: "test_collection".to_string(),
607            name: "Test Collection".to_string(),
608            crdt_type: "LwwMap".to_string(),
609            version: 1,
610            last_sync: 0,
611            replica_count: 1,
612        };
613
614        // Add collection
615        assert!(manager.add_collection(metadata.clone()).await.is_ok());
616
617        // Get collection
618        let retrieved = manager.get_collection("test_collection").await.unwrap();
619        assert!(retrieved.is_some());
620        assert_eq!(retrieved.unwrap().id, "test_collection");
621
622        // List collections
623        let collections = manager.list_collections().await.unwrap();
624        assert_eq!(collections.len(), 1);
625        assert_eq!(collections[0].id, "test_collection");
626
627        // Remove collection
628        assert!(manager.remove_collection("test_collection").await.is_ok());
629
630        let collections = manager.list_collections().await.unwrap();
631        assert_eq!(collections.len(), 0);
632    }
633
634    #[tokio::test]
635    async fn test_sync_message_serialization() {
636        let message = SyncMessage::SyncRequest {
637            collection_id: "test_collection".to_string(),
638            replica_id: ReplicaId::default(),
639            data: b"test data".to_vec(),
640            timestamp: 1234567890,
641        };
642
643        let serialized = serde_json::to_string(&message).unwrap();
644        let deserialized: SyncMessage = serde_json::from_str(&serialized).unwrap();
645
646        match deserialized {
647            SyncMessage::SyncRequest {
648                collection_id,
649                replica_id,
650                data,
651                timestamp,
652            } => {
653                assert_eq!(collection_id, "test_collection");
654                assert_eq!(replica_id.0, replica_id.0); // Just check it's valid
655                assert_eq!(data, b"test data");
656                assert_eq!(timestamp, 1234567890);
657            }
658            _ => panic!("Unexpected message type"),
659        }
660    }
661}