leptos_sync_core/sync/
engine.rs

1//! Enhanced synchronization engine for real-time sync
2
3use crate::{
4    crdt::{Mergeable, ReplicaId},
5    storage::Storage,
6    transport::{SyncTransport, TransportError},
7};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
15pub enum SyncEngineError {
16    #[error("Storage error: {0}")]
17    Storage(#[from] crate::storage::StorageError),
18    #[error("Transport error: {0}")]
19    Transport(#[from] TransportError),
20    #[error("Serialization error: {0}")]
21    Serialization(#[from] serde_json::Error),
22    #[error("CRDT error: {0}")]
23    CrdtError(#[from] Box<dyn std::error::Error + Send + Sync>),
24    #[error("Sync operation failed: {0}")]
25    SyncFailed(String),
26    #[error("Conflict resolution failed: {0}")]
27    ConflictResolution(String),
28}
29
30/// Enhanced synchronization state
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub enum SyncState {
33    /// Not synchronized
34    NotSynced,
35    /// Currently synchronizing
36    Syncing,
37    /// Synchronized
38    Synced,
39    /// Synchronization failed
40    Failed(String),
41    /// Resolving conflicts
42    ResolvingConflicts,
43    /// Offline mode
44    Offline,
45}
46
47/// Enhanced synchronization message types
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub enum SyncMessage<T> {
50    /// Sync request with data
51    Sync { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
52    /// Acknowledgment of sync
53    Ack { key: String, replica_id: ReplicaId },
54    /// Peer presence announcement
55    Presence { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
56    /// Conflict resolution request
57    Conflict { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
58    /// Heartbeat to keep connection alive
59    Heartbeat { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
60}
61
62/// Enhanced synchronization manager
63pub struct SyncEngine<Tr> 
64where 
65    Tr: SyncTransport + Clone,
66{
67    replica_id: ReplicaId,
68    state: Arc<RwLock<SyncState>>,
69    peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
70    storage: Storage,
71    transport: Tr,
72    sync_queue: Arc<RwLock<Vec<SyncMessage<Vec<u8>>>>>,
73    conflict_resolver: Arc<RwLock<Option<DefaultConflictResolver>>>,
74}
75
76/// Information about a peer
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PeerInfo {
79    pub replica_id: ReplicaId,
80    pub last_seen: chrono::DateTime<chrono::Utc>,
81    pub is_online: bool,
82    pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
83    pub sync_status: PeerSyncStatus,
84}
85
86/// Peer synchronization status
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum PeerSyncStatus {
89    /// Never synced
90    Never,
91    /// Last sync was successful
92    Success { timestamp: chrono::DateTime<chrono::Utc> },
93    /// Last sync failed
94    Failed { timestamp: chrono::DateTime<chrono::Utc>, error: String },
95    /// Currently syncing
96    Syncing { started: chrono::DateTime<chrono::Utc> },
97}
98
99/// Default conflict resolver using Last-Write-Wins
100pub struct DefaultConflictResolver;
101
102impl DefaultConflictResolver {
103    /// Resolve a conflict between two values
104    pub fn resolve<T: Mergeable>(&self, local: &T, remote: &T) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
105        // For now, just merge them - in a real implementation you'd want more sophisticated logic
106        let mut result = local.clone();
107        result.merge(remote).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
108        Ok(result)
109    }
110}
111
112impl<Tr> SyncEngine<Tr>
113where
114    Tr: SyncTransport + Clone + 'static,
115{
116    pub fn new(storage: Storage, transport: Tr) -> Self {
117        Self {
118            replica_id: ReplicaId::default(),
119            state: Arc::new(RwLock::new(SyncState::NotSynced)),
120            peers: Arc::new(RwLock::new(HashMap::new())),
121            storage,
122            transport,
123            sync_queue: Arc::new(RwLock::new(Vec::new())),
124            conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
125        }
126    }
127
128    pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
129        Self {
130            replica_id,
131            state: Arc::new(RwLock::new(SyncState::NotSynced)),
132            peers: Arc::new(RwLock::new(HashMap::new())),
133            storage,
134            transport,
135            sync_queue: Arc::new(RwLock::new(Vec::new())),
136            conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
137        }
138    }
139
140    pub async fn state(&self) -> SyncState {
141        self.state.read().await.clone()
142    }
143
144    pub fn replica_id(&self) -> ReplicaId {
145        self.replica_id
146    }
147
148    pub async fn is_online(&self) -> bool {
149        self.transport.is_connected()
150    }
151
152    pub async fn peer_count(&self) -> usize {
153        self.peers.read().await.len()
154    }
155
156    /// Start the synchronization process
157    pub async fn start_sync(&mut self) -> Result<(), SyncEngineError> {
158        let mut state = self.state.write().await;
159        *state = SyncState::Syncing;
160
161        // Try to connect to transport
162        if !self.transport.is_connected() {
163            // For WebSocket, we'd try to connect here
164            tracing::info!("Transport not connected, attempting to connect...");
165        }
166
167        // Announce presence to peers
168        self.announce_presence().await?;
169
170        // Start background sync loop
171        self.start_background_sync().await;
172
173        Ok(())
174    }
175
176    /// Stop the synchronization process
177    pub async fn stop_sync(&mut self) -> Result<(), SyncEngineError> {
178        let mut state = self.state.write().await;
179        *state = SyncState::NotSynced;
180
181        // Disconnect from transport if needed
182        if self.transport.is_connected() {
183            tracing::info!("Stopping sync, disconnecting from transport...");
184        }
185
186        Ok(())
187    }
188
189    /// Sync a CRDT value with peers
190    pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncEngineError>
191    where
192        V: Mergeable + Serialize + Send + Sync + Clone,
193    {
194        // Serialize the value
195        let data = serde_json::to_vec(value)?;
196        
197        // Create sync message
198        let message = SyncMessage::Sync {
199            key: key.to_string(),
200            data,
201            replica_id: self.replica_id,
202            timestamp: chrono::Utc::now(),
203        };
204
205        // Add to sync queue
206        {
207            let mut queue = self.sync_queue.write().await;
208            queue.push(SyncMessage::Sync {
209                key: key.to_string(),
210                data: serde_json::to_vec(value)?,
211                replica_id: self.replica_id,
212                timestamp: chrono::Utc::now(),
213            });
214        }
215
216        // Send via transport
217        let message_bytes = serde_json::to_vec(&message)?;
218        self.transport.send(&message_bytes).await
219            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
220
221        Ok(())
222    }
223
224    /// Process incoming messages
225    pub async fn process_messages(&mut self) -> Result<(), SyncEngineError> {
226        // Receive messages from transport
227        let messages = self.transport.receive().await
228            .map_err(|e| SyncEngineError::Transport(TransportError::ReceiveFailed(e.to_string())))?;
229        
230        for message_bytes in messages {
231            let message: SyncMessage<Vec<u8>> = serde_json::from_slice(&message_bytes)?;
232            
233            match message {
234                SyncMessage::Sync { key, data, replica_id, timestamp } => {
235                    // Handle sync message
236                    self.handle_sync_message(key, data, replica_id, timestamp).await?;
237                }
238                SyncMessage::Ack { key, replica_id } => {
239                    // Handle acknowledgment
240                    self.handle_ack_message(key, replica_id).await?;
241                }
242                SyncMessage::Presence { replica_id, timestamp } => {
243                    // Handle presence update
244                    self.handle_presence_message(replica_id, timestamp).await?;
245                }
246                SyncMessage::Conflict { key, data, replica_id, timestamp } => {
247                    // Handle conflict resolution
248                    self.handle_conflict_message(key, data, replica_id, timestamp).await?;
249                }
250                SyncMessage::Heartbeat { replica_id, timestamp } => {
251                    // Handle heartbeat
252                    self.handle_heartbeat_message(replica_id, timestamp).await?;
253                }
254            }
255        }
256
257        Ok(())
258    }
259
260    /// Announce presence to peers
261    async fn announce_presence(&self) -> Result<(), SyncEngineError> {
262        let message: SyncMessage<()> = SyncMessage::Presence {
263            replica_id: self.replica_id,
264            timestamp: chrono::Utc::now(),
265        };
266
267        let message_bytes = serde_json::to_vec(&message)?;
268        self.transport.send(&message_bytes).await
269            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
270
271        Ok(())
272    }
273
274    /// Send heartbeat to peers
275    async fn send_heartbeat(&self) -> Result<(), SyncEngineError> {
276        let message: SyncMessage<()> = SyncMessage::Heartbeat {
277            replica_id: self.replica_id,
278            timestamp: chrono::Utc::now(),
279        };
280
281        let message_bytes = serde_json::to_vec(&message)?;
282        self.transport.send(&message_bytes).await
283            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
284
285        Ok(())
286    }
287
288    /// Start background synchronization loop
289    async fn start_background_sync(&self) {
290        let transport = self.transport.clone();
291        let replica_id = self.replica_id;
292        
293        tokio::spawn(async move {
294            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
295            
296            loop {
297                interval.tick().await;
298                
299                // Send heartbeat
300                let message: SyncMessage<()> = SyncMessage::Heartbeat {
301                    replica_id,
302                    timestamp: chrono::Utc::now(),
303                };
304                
305                if let Ok(message_bytes) = serde_json::to_vec(&message) {
306                    let _ = transport.send(&message_bytes).await;
307                }
308            }
309        });
310    }
311
312    /// Handle sync message
313    async fn handle_sync_message(&mut self, key: String, _data: Vec<u8>, replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
314        tracing::debug!("Received sync message for key {} from replica {}", key, replica_id);
315        
316        // For now, just acknowledge
317        let ack: SyncMessage<()> = SyncMessage::Ack {
318            key,
319            replica_id,
320        };
321        
322        let ack_bytes = serde_json::to_vec(&ack)?;
323        self.transport.send(&ack_bytes).await
324            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
325
326        Ok(())
327    }
328
329    /// Handle acknowledgment message
330    async fn handle_ack_message(&mut self, _key: String, _replica_id: ReplicaId) -> Result<(), SyncEngineError> {
331        // For now, just log
332        tracing::debug!("Received ack message");
333        Ok(())
334    }
335
336    /// Handle presence message
337    async fn handle_presence_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
338        let mut peers = self.peers.write().await;
339        
340        let peer_info = PeerInfo {
341            replica_id,
342            last_seen: timestamp,
343            is_online: true,
344            last_sync: None,
345            sync_status: PeerSyncStatus::Never,
346        };
347        
348        peers.insert(replica_id, peer_info);
349        
350        tracing::debug!("Updated peer info for replica {}", replica_id);
351        Ok(())
352    }
353
354    /// Handle conflict message
355    async fn handle_conflict_message(&mut self, _key: String, _data: Vec<u8>, _replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
356        // For now, just log
357        tracing::debug!("Received conflict message");
358        Ok(())
359    }
360
361    /// Handle heartbeat message
362    async fn handle_heartbeat_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
363        let mut peers = self.peers.write().await;
364        
365        if let Some(peer_info) = peers.get_mut(&replica_id) {
366            peer_info.last_seen = timestamp;
367            tracing::debug!("Updated heartbeat for replica {}", replica_id);
368        }
369
370        Ok(())
371    }
372
373    /// Get all peers
374    pub async fn peers(&self) -> impl Iterator<Item = (ReplicaId, PeerInfo)> {
375        let peers = self.peers.read().await;
376        peers.clone().into_iter()
377    }
378
379    /// Check if there's a conflict between two values
380    fn has_conflict<V: Mergeable>(&self, _local: &V, _remote: &V) -> bool {
381        // For now, always return false - in a real implementation you'd check timestamps
382        false
383    }
384}