leptos_sync_core/sync/
engine.rs

1//! Enhanced synchronization engine for real-time sync
2
3use crate::{
4    crdt::{Mergeable, ReplicaId},
5    storage::Storage,
6    transport::{SyncTransport, TransportError},
7};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use thiserror::Error;
13
14#[derive(Error, Debug)]
15pub enum SyncEngineError {
16    #[error("Storage error: {0}")]
17    Storage(#[from] crate::storage::StorageError),
18    #[error("Transport error: {0}")]
19    Transport(#[from] TransportError),
20    #[error("Serialization error: {0}")]
21    Serialization(#[from] serde_json::Error),
22    #[error("CRDT error: {0}")]
23    CrdtError(#[from] Box<dyn std::error::Error + Send + Sync>),
24    #[error("Sync operation failed: {0}")]
25    SyncFailed(String),
26    #[error("Conflict resolution failed: {0}")]
27    ConflictResolution(String),
28}
29
30/// Enhanced synchronization state
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
32pub enum SyncState {
33    /// Not synchronized
34    NotSynced,
35    /// Currently synchronizing
36    Syncing,
37    /// Synchronized
38    Synced,
39    /// Synchronization failed
40    Failed(String),
41    /// Resolving conflicts
42    ResolvingConflicts,
43    /// Offline mode
44    Offline,
45    /// Connected to network
46    Connected,
47    /// Disconnected from network
48    Disconnected,
49}
50
51/// Enhanced synchronization message types
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum SyncMessage<T> {
54    /// Sync request with data
55    Sync { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
56    /// Acknowledgment of sync
57    Ack { key: String, replica_id: ReplicaId },
58    /// Peer presence announcement
59    Presence { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
60    /// Conflict resolution request
61    Conflict { key: String, data: T, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
62    /// Heartbeat to keep connection alive
63    Heartbeat { replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc> },
64}
65
66/// Enhanced synchronization manager
67pub struct SyncEngine<Tr> 
68where 
69    Tr: SyncTransport + Clone,
70{
71    replica_id: ReplicaId,
72    state: Arc<RwLock<SyncState>>,
73    peers: Arc<RwLock<HashMap<ReplicaId, PeerInfo>>>,
74    storage: Storage,
75    transport: Tr,
76    sync_queue: Arc<RwLock<Vec<SyncMessage<Vec<u8>>>>>,
77    conflict_resolver: Arc<RwLock<Option<DefaultConflictResolver>>>,
78}
79
80/// Information about a peer
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct PeerInfo {
83    pub replica_id: ReplicaId,
84    pub last_seen: chrono::DateTime<chrono::Utc>,
85    pub is_online: bool,
86    pub last_sync: Option<chrono::DateTime<chrono::Utc>>,
87    pub sync_status: PeerSyncStatus,
88    // Additional fields for compatibility
89    pub id: ReplicaId,
90    pub status: PeerSyncStatus,
91    pub version: u32,
92}
93
94/// Peer synchronization status
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
96pub enum PeerSyncStatus {
97    /// Never synced
98    Never,
99    /// Last sync was successful
100    Success { timestamp: chrono::DateTime<chrono::Utc> },
101    /// Last sync failed
102    Failed { timestamp: chrono::DateTime<chrono::Utc>, error: String },
103    /// Currently syncing
104    Syncing { started: chrono::DateTime<chrono::Utc> },
105    /// Connected to peer
106    Connected,
107    /// Disconnected from peer
108    Disconnected,
109}
110
111/// Default conflict resolver using Last-Write-Wins
112pub struct DefaultConflictResolver;
113
114impl DefaultConflictResolver {
115    /// Resolve a conflict between two values
116    pub fn resolve<T: Mergeable>(&self, local: &T, remote: &T) -> Result<T, Box<dyn std::error::Error + Send + Sync>> {
117        // For now, just merge them - in a real implementation you'd want more sophisticated logic
118        let mut result = local.clone();
119        result.merge(remote).map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
120        Ok(result)
121    }
122}
123
124impl<Tr> SyncEngine<Tr>
125where
126    Tr: SyncTransport + Clone + 'static,
127{
128    pub fn new(storage: Storage, transport: Tr) -> Self {
129        Self {
130            replica_id: ReplicaId::default(),
131            state: Arc::new(RwLock::new(SyncState::NotSynced)),
132            peers: Arc::new(RwLock::new(HashMap::new())),
133            storage,
134            transport,
135            sync_queue: Arc::new(RwLock::new(Vec::new())),
136            conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
137        }
138    }
139
140    pub fn with_replica_id(storage: Storage, transport: Tr, replica_id: ReplicaId) -> Self {
141        Self {
142            replica_id,
143            state: Arc::new(RwLock::new(SyncState::NotSynced)),
144            peers: Arc::new(RwLock::new(HashMap::new())),
145            storage,
146            transport,
147            sync_queue: Arc::new(RwLock::new(Vec::new())),
148            conflict_resolver: Arc::new(RwLock::new(Some(DefaultConflictResolver))),
149        }
150    }
151
152    pub async fn state(&self) -> SyncState {
153        self.state.read().await.clone()
154    }
155
156    pub fn replica_id(&self) -> ReplicaId {
157        self.replica_id
158    }
159
160    pub async fn is_online(&self) -> bool {
161        self.transport.is_connected()
162    }
163
164    pub async fn peer_count(&self) -> usize {
165        self.peers.read().await.len()
166    }
167
168    /// Start the synchronization process
169    pub async fn start_sync(&mut self) -> Result<(), SyncEngineError> {
170        let mut state = self.state.write().await;
171        *state = SyncState::Syncing;
172
173        // Try to connect to transport
174        if !self.transport.is_connected() {
175            // For WebSocket, we'd try to connect here
176            tracing::info!("Transport not connected, attempting to connect...");
177        }
178
179        // Announce presence to peers
180        self.announce_presence().await?;
181
182        // Start background sync loop
183        self.start_background_sync().await;
184
185        Ok(())
186    }
187
188    /// Stop the synchronization process
189    pub async fn stop_sync(&mut self) -> Result<(), SyncEngineError> {
190        let mut state = self.state.write().await;
191        *state = SyncState::NotSynced;
192
193        // Disconnect from transport if needed
194        if self.transport.is_connected() {
195            tracing::info!("Stopping sync, disconnecting from transport...");
196        }
197
198        Ok(())
199    }
200
201    /// Sync a CRDT value with peers
202    pub async fn sync<V>(&mut self, key: &str, value: &V) -> Result<(), SyncEngineError>
203    where
204        V: Mergeable + Serialize + Send + Sync + Clone,
205    {
206        // Serialize the value
207        let data = serde_json::to_vec(value)?;
208        
209        // Create sync message
210        let message = SyncMessage::Sync {
211            key: key.to_string(),
212            data,
213            replica_id: self.replica_id,
214            timestamp: chrono::Utc::now(),
215        };
216
217        // Add to sync queue
218        {
219            let mut queue = self.sync_queue.write().await;
220            queue.push(SyncMessage::Sync {
221                key: key.to_string(),
222                data: serde_json::to_vec(value)?,
223                replica_id: self.replica_id,
224                timestamp: chrono::Utc::now(),
225            });
226        }
227
228        // Send via transport
229        let message_bytes = serde_json::to_vec(&message)?;
230        self.transport.send(&message_bytes).await
231            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
232
233        Ok(())
234    }
235
236    /// Process incoming messages
237    pub async fn process_messages(&mut self) -> Result<(), SyncEngineError> {
238        // Receive messages from transport
239        let messages = self.transport.receive().await
240            .map_err(|e| SyncEngineError::Transport(TransportError::ReceiveFailed(e.to_string())))?;
241        
242        for message_bytes in messages {
243            let message: SyncMessage<Vec<u8>> = serde_json::from_slice(&message_bytes)?;
244            
245            match message {
246                SyncMessage::Sync { key, data, replica_id, timestamp } => {
247                    // Handle sync message
248                    self.handle_sync_message(key, data, replica_id, timestamp).await?;
249                }
250                SyncMessage::Ack { key, replica_id } => {
251                    // Handle acknowledgment
252                    self.handle_ack_message(key, replica_id).await?;
253                }
254                SyncMessage::Presence { replica_id, timestamp } => {
255                    // Handle presence update
256                    self.handle_presence_message(replica_id, timestamp).await?;
257                }
258                SyncMessage::Conflict { key, data, replica_id, timestamp } => {
259                    // Handle conflict resolution
260                    self.handle_conflict_message(key, data, replica_id, timestamp).await?;
261                }
262                SyncMessage::Heartbeat { replica_id, timestamp } => {
263                    // Handle heartbeat
264                    self.handle_heartbeat_message(replica_id, timestamp).await?;
265                }
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Announce presence to peers
273    async fn announce_presence(&self) -> Result<(), SyncEngineError> {
274        let message: SyncMessage<()> = SyncMessage::Presence {
275            replica_id: self.replica_id,
276            timestamp: chrono::Utc::now(),
277        };
278
279        let message_bytes = serde_json::to_vec(&message)?;
280        self.transport.send(&message_bytes).await
281            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
282
283        Ok(())
284    }
285
286    /// Send heartbeat to peers
287    async fn send_heartbeat(&self) -> Result<(), SyncEngineError> {
288        let message: SyncMessage<()> = SyncMessage::Heartbeat {
289            replica_id: self.replica_id,
290            timestamp: chrono::Utc::now(),
291        };
292
293        let message_bytes = serde_json::to_vec(&message)?;
294        self.transport.send(&message_bytes).await
295            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
296
297        Ok(())
298    }
299
300    /// Start background synchronization loop
301    async fn start_background_sync(&self) {
302        let transport = self.transport.clone();
303        let replica_id = self.replica_id;
304        
305        tokio::spawn(async move {
306            let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
307            
308            loop {
309                interval.tick().await;
310                
311                // Send heartbeat
312                let message: SyncMessage<()> = SyncMessage::Heartbeat {
313                    replica_id,
314                    timestamp: chrono::Utc::now(),
315                };
316                
317                if let Ok(message_bytes) = serde_json::to_vec(&message) {
318                    let _ = transport.send(&message_bytes).await;
319                }
320            }
321        });
322    }
323
324    /// Handle sync message
325    async fn handle_sync_message(&mut self, key: String, _data: Vec<u8>, replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
326        tracing::debug!("Received sync message for key {} from replica {}", key, replica_id);
327        
328        // For now, just acknowledge
329        let ack: SyncMessage<()> = SyncMessage::Ack {
330            key,
331            replica_id,
332        };
333        
334        let ack_bytes = serde_json::to_vec(&ack)?;
335        self.transport.send(&ack_bytes).await
336            .map_err(|e| SyncEngineError::Transport(TransportError::SendFailed(e.to_string())))?;
337
338        Ok(())
339    }
340
341    /// Handle acknowledgment message
342    async fn handle_ack_message(&mut self, _key: String, _replica_id: ReplicaId) -> Result<(), SyncEngineError> {
343        // For now, just log
344        tracing::debug!("Received ack message");
345        Ok(())
346    }
347
348    /// Handle presence message
349    async fn handle_presence_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
350        let mut peers = self.peers.write().await;
351        
352        let peer_info = PeerInfo {
353            replica_id: replica_id.clone(),
354            last_seen: timestamp,
355            is_online: true,
356            last_sync: None,
357            sync_status: PeerSyncStatus::Never,
358            // Additional fields for compatibility
359            id: replica_id,
360            status: PeerSyncStatus::Never,
361            version: 1,
362        };
363        
364        peers.insert(replica_id, peer_info);
365        
366        tracing::debug!("Updated peer info for replica {}", replica_id);
367        Ok(())
368    }
369
370    /// Handle conflict message
371    async fn handle_conflict_message(&mut self, _key: String, _data: Vec<u8>, _replica_id: ReplicaId, _timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
372        // For now, just log
373        tracing::debug!("Received conflict message");
374        Ok(())
375    }
376
377    /// Handle heartbeat message
378    async fn handle_heartbeat_message(&mut self, replica_id: ReplicaId, timestamp: chrono::DateTime<chrono::Utc>) -> Result<(), SyncEngineError> {
379        let mut peers = self.peers.write().await;
380        
381        if let Some(peer_info) = peers.get_mut(&replica_id) {
382            peer_info.last_seen = timestamp;
383            tracing::debug!("Updated heartbeat for replica {}", replica_id);
384        }
385
386        Ok(())
387    }
388
389    /// Get all peers
390    pub async fn peers(&self) -> impl Iterator<Item = (ReplicaId, PeerInfo)> + 'static {
391        let peers = self.peers.read().await;
392        peers.clone().into_iter()
393    }
394
395    /// Check if there's a conflict between two values
396    fn has_conflict<V: Mergeable>(&self, _local: &V, _remote: &V) -> bool {
397        // For now, always return false - in a real implementation you'd check timestamps
398        false
399    }
400}