leptos_sync_core/sync/
realtime.rs

1//! Real-time synchronization engine for live collaboration
2
3use crate::crdt::{Mergeable, ReplicaId};
4use crate::storage::{Storage, LocalStorage};
5use crate::transport::SyncTransport;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11use thiserror::Error;
12
13#[derive(Error, Debug)]
14pub enum RealtimeSyncError {
15    #[error("Transport error: {0}")]
16    Transport(String),
17    #[error("Storage error: {0}")]
18    Storage(String),
19    #[error("Serialization error: {0}")]
20    Serialization(String),
21    #[error("Subscription not found: {0}")]
22    SubscriptionNotFound(String),
23    #[error("Invalid operation: {0}")]
24    InvalidOperation(String),
25}
26
27/// Real-time synchronization event
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum RealtimeEvent {
30    /// Document changed
31    DocumentChanged {
32        key: String,
33        replica_id: ReplicaId,
34        timestamp: DateTime<Utc>,
35        change_type: ChangeType,
36    },
37    /// User joined
38    UserJoined {
39        replica_id: ReplicaId,
40        timestamp: DateTime<Utc>,
41        user_info: Option<UserInfo>,
42    },
43    /// User left
44    UserLeft {
45        replica_id: ReplicaId,
46        timestamp: DateTime<Utc>,
47    },
48    /// Sync started
49    SyncStarted {
50        replica_id: ReplicaId,
51        timestamp: DateTime<Utc>,
52    },
53    /// Sync completed
54    SyncCompleted {
55        replica_id: ReplicaId,
56        timestamp: DateTime<Utc>,
57        changes_synced: usize,
58    },
59    /// Conflict detected
60    ConflictDetected {
61        key: String,
62        replica_id: ReplicaId,
63        timestamp: DateTime<Utc>,
64        conflict_type: String,
65    },
66}
67
68/// Type of change
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub enum ChangeType {
71    Created,
72    Updated,
73    Deleted,
74    Merged,
75}
76
77/// User information
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct UserInfo {
80    pub name: Option<String>,
81    pub avatar: Option<String>,
82    pub color: Option<String>,
83}
84
85/// Real-time synchronization manager
86pub struct RealtimeSyncManager<Tr>
87where
88    Tr: SyncTransport + Clone + Send + Sync + 'static,
89{
90    replica_id: ReplicaId,
91    transport: Tr,
92    storage: Arc<Storage>,
93    event_sender: broadcast::Sender<RealtimeEvent>,
94    subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
95    active_users: Arc<RwLock<HashMap<ReplicaId, UserInfo>>>,
96    sync_state: Arc<RwLock<SyncState>>,
97    heartbeat_interval: std::time::Duration,
98    presence_timeout: std::time::Duration,
99}
100
101/// Subscription to real-time events
102pub struct Subscription {
103    pub id: String,
104    pub event_types: Vec<String>,
105    pub callback: Box<dyn Fn(RealtimeEvent) + Send + Sync>,
106}
107
108/// Synchronization state
109#[derive(Debug, Clone)]
110pub struct SyncState {
111    pub is_syncing: bool,
112    pub last_sync: Option<DateTime<Utc>>,
113    pub connected_users: usize,
114    pub pending_changes: usize,
115    pub sync_errors: Vec<String>,
116}
117
118impl<Tr> RealtimeSyncManager<Tr>
119where
120    Tr: SyncTransport + Clone + Send + Sync + 'static,
121{
122    pub fn new(
123        replica_id: ReplicaId,
124        transport: Tr,
125        storage: Arc<Storage>,
126    ) -> Self {
127        let (event_sender, _) = broadcast::channel(1000);
128        
129        Self {
130            replica_id,
131            transport,
132            storage,
133            event_sender,
134            subscriptions: Arc::new(RwLock::new(HashMap::new())),
135            active_users: Arc::new(RwLock::new(HashMap::new())),
136            sync_state: Arc::new(RwLock::new(SyncState {
137                is_syncing: false,
138                last_sync: None,
139                connected_users: 0,
140                pending_changes: 0,
141                sync_errors: Vec::new(),
142            })),
143            heartbeat_interval: std::time::Duration::from_secs(30),
144            presence_timeout: std::time::Duration::from_secs(120),
145        }
146    }
147
148    /// Start real-time synchronization
149    pub async fn start(&mut self) -> Result<(), RealtimeSyncError> {
150        let mut state = self.sync_state.write().await;
151        state.is_syncing = true;
152        drop(state);
153
154        // Announce presence
155        self.announce_presence().await?;
156
157        // Start heartbeat
158        self.start_heartbeat().await;
159
160        // Start presence monitoring
161        self.start_presence_monitoring().await;
162
163        // Emit sync started event
164        self.emit_event(RealtimeEvent::SyncStarted {
165            replica_id: self.replica_id,
166            timestamp: Utc::now(),
167        }).await;
168
169        Ok(())
170    }
171
172    /// Stop real-time synchronization
173    pub async fn stop(&mut self) -> Result<(), RealtimeSyncError> {
174        let mut state = self.sync_state.write().await;
175        state.is_syncing = false;
176        drop(state);
177
178        // Announce departure
179        self.announce_departure().await?;
180
181        // Emit sync completed event
182        self.emit_event(RealtimeEvent::SyncCompleted {
183            replica_id: self.replica_id,
184            timestamp: Utc::now(),
185            changes_synced: 0,
186        }).await;
187
188        Ok(())
189    }
190
191    /// Subscribe to real-time events
192    pub async fn subscribe(
193        &self,
194        event_types: Vec<String>,
195        callback: Box<dyn Fn(RealtimeEvent) + Send + Sync>,
196    ) -> Result<String, RealtimeSyncError> {
197        let subscription_id = uuid::Uuid::new_v4().to_string();
198        
199        let subscription = Subscription {
200            id: subscription_id.clone(),
201            event_types,
202            callback,
203        };
204
205        let mut subscriptions = self.subscriptions.write().await;
206        subscriptions.insert(subscription_id.clone(), subscription);
207
208        Ok(subscription_id)
209    }
210
211    /// Unsubscribe from real-time events
212    pub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), RealtimeSyncError> {
213        let mut subscriptions = self.subscriptions.write().await;
214        
215        if subscriptions.remove(subscription_id).is_some() {
216            Ok(())
217        } else {
218            Err(RealtimeSyncError::SubscriptionNotFound(subscription_id.to_string()))
219        }
220    }
221
222    /// Broadcast a change to all connected peers
223    pub async fn broadcast_change<T: Mergeable + Serialize + Clone>(
224        &self,
225        key: &str,
226        value: &T,
227        change_type: ChangeType,
228    ) -> Result<(), RealtimeSyncError> {
229        // Store locally first
230        self.storage.set(key, value).await
231            .map_err(|e| RealtimeSyncError::Storage(e.to_string()))?;
232
233        // Serialize and send via transport
234        let change_message = ChangeMessage {
235            key: key.to_string(),
236            data: serde_json::to_vec(value)
237                .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?,
238            replica_id: self.replica_id,
239            timestamp: Utc::now(),
240            change_type: change_type.clone(),
241        };
242
243        let message_bytes = serde_json::to_vec(&change_message)
244            .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
245
246        self.transport.send(&message_bytes).await
247            .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
248
249        // Emit local event
250        self.emit_event(RealtimeEvent::DocumentChanged {
251            key: key.to_string(),
252            replica_id: self.replica_id,
253            timestamp: Utc::now(),
254            change_type,
255        }).await;
256
257        Ok(())
258    }
259
260    /// Process incoming changes from peers
261    pub async fn process_incoming_changes(&mut self) -> Result<usize, RealtimeSyncError> {
262        let messages = self.transport.receive().await
263            .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
264
265        let mut changes_processed = 0;
266
267        for message_bytes in messages {
268            if let Ok(change_message) = serde_json::from_slice::<ChangeMessage>(&message_bytes) {
269                // Process the change
270                self.process_change(change_message).await?;
271                changes_processed += 1;
272            }
273        }
274
275        // Update sync state
276        let mut state = self.sync_state.write().await;
277        state.last_sync = Some(Utc::now());
278        state.pending_changes = state.pending_changes.saturating_sub(changes_processed);
279
280        Ok(changes_processed)
281    }
282
283    /// Get current synchronization state
284    pub async fn get_sync_state(&self) -> SyncState {
285        self.sync_state.read().await.clone()
286    }
287
288    /// Get active users
289    pub async fn get_active_users(&self) -> HashMap<ReplicaId, UserInfo> {
290        self.active_users.read().await.clone()
291    }
292
293    /// Announce presence to peers
294    async fn announce_presence(&self) -> Result<(), RealtimeSyncError> {
295        let presence_message = PresenceMessage {
296            replica_id: self.replica_id,
297            timestamp: Utc::now(),
298            user_info: None, // Could be populated with actual user info
299        };
300
301        let message_bytes = serde_json::to_vec(&presence_message)
302            .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
303
304        self.transport.send(&message_bytes).await
305            .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
306
307        Ok(())
308    }
309
310    /// Announce departure to peers
311    async fn announce_departure(&self) -> Result<(), RealtimeSyncError> {
312        let departure_message = DepartureMessage {
313            replica_id: self.replica_id,
314            timestamp: Utc::now(),
315        };
316
317        let message_bytes = serde_json::to_vec(&departure_message)
318            .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
319
320        self.transport.send(&message_bytes).await
321            .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
322
323        Ok(())
324    }
325
326    /// Start heartbeat mechanism
327    async fn start_heartbeat(&self) {
328        let transport = self.transport.clone();
329        let replica_id = self.replica_id;
330        let interval = self.heartbeat_interval;
331
332        tokio::spawn(async move {
333            let mut interval_timer = tokio::time::interval(interval);
334            
335            loop {
336                interval_timer.tick().await;
337                
338                // Send heartbeat
339                let heartbeat_message = HeartbeatMessage {
340                    replica_id,
341                    timestamp: Utc::now(),
342                };
343
344                if let Ok(message_bytes) = serde_json::to_vec(&heartbeat_message) {
345                    let _ = transport.send(&message_bytes).await;
346                }
347            }
348        });
349    }
350
351    /// Start presence monitoring
352    async fn start_presence_monitoring(&self) {
353        let active_users = self.active_users.clone();
354        let timeout = self.presence_timeout;
355
356        tokio::spawn(async move {
357            let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(60));
358            
359            loop {
360                interval_timer.tick().await;
361                
362                let now = Utc::now();
363                let mut users = active_users.write().await;
364                
365                // Remove users who haven't sent heartbeat recently
366                users.retain(|_, user_info| {
367                    // This is a simplified check - in reality you'd track last heartbeat
368                    true
369                });
370            }
371        });
372    }
373
374    /// Process an incoming change
375    async fn process_change(&mut self, change_message: ChangeMessage) -> Result<(), RealtimeSyncError> {
376        // Emit event for the change
377        self.emit_event(RealtimeEvent::DocumentChanged {
378            key: change_message.key.clone(),
379            replica_id: change_message.replica_id,
380            timestamp: change_message.timestamp,
381            change_type: change_message.change_type,
382        }).await;
383
384        Ok(())
385    }
386
387    /// Emit an event to all subscribers
388    async fn emit_event(&self, event: RealtimeEvent) {
389        let subscriptions = self.subscriptions.read().await;
390        
391        for subscription in subscriptions.values() {
392            // Check if subscription is interested in this event type
393            let event_type = match &event {
394                RealtimeEvent::DocumentChanged { .. } => "document_changed",
395                RealtimeEvent::UserJoined { .. } => "user_joined",
396                RealtimeEvent::UserLeft { .. } => "user_left",
397                RealtimeEvent::SyncStarted { .. } => "sync_started",
398                RealtimeEvent::SyncCompleted { .. } => "sync_completed",
399                RealtimeEvent::ConflictDetected { .. } => "conflict_detected",
400            };
401
402            if subscription.event_types.contains(&event_type.to_string()) 
403                || subscription.event_types.contains(&"*".to_string()) {
404                (subscription.callback)(event.clone());
405            }
406        }
407    }
408}
409
410/// Change message for broadcasting updates
411#[derive(Debug, Clone, Serialize, Deserialize)]
412struct ChangeMessage {
413    key: String,
414    data: Vec<u8>,
415    replica_id: ReplicaId,
416    timestamp: DateTime<Utc>,
417    change_type: ChangeType,
418}
419
420/// Presence message for announcing user presence
421#[derive(Debug, Clone, Serialize, Deserialize)]
422struct PresenceMessage {
423    replica_id: ReplicaId,
424    timestamp: DateTime<Utc>,
425    user_info: Option<UserInfo>,
426}
427
428/// Departure message for announcing user departure
429#[derive(Debug, Clone, Serialize, Deserialize)]
430struct DepartureMessage {
431    replica_id: ReplicaId,
432    timestamp: DateTime<Utc>,
433}
434
435/// Heartbeat message for keeping connections alive
436#[derive(Debug, Clone, Serialize, Deserialize)]
437struct HeartbeatMessage {
438    replica_id: ReplicaId,
439    timestamp: DateTime<Utc>,
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use crate::storage::memory::MemoryStorage;
446    use crate::transport::memory::InMemoryTransport;
447
448    #[tokio::test]
449    async fn test_realtime_sync_manager_creation() {
450        let storage = Arc::new(Storage::memory());
451        let transport = InMemoryTransport::new();
452        let replica_id = ReplicaId::default();
453        
454        let manager = RealtimeSyncManager::new(replica_id, transport, storage);
455        assert_eq!(manager.replica_id, replica_id);
456    }
457
458    #[tokio::test]
459    async fn test_subscription_management() {
460        let storage = Arc::new(Storage::memory());
461        let transport = InMemoryTransport::new();
462        let replica_id = ReplicaId::default();
463        
464        let manager = RealtimeSyncManager::new(replica_id, transport, storage);
465        
466        // Subscribe to events
467        let callback = Box::new(|_event: RealtimeEvent| {});
468        let subscription_id = manager.subscribe(
469            vec!["document_changed".to_string()],
470            callback
471        ).await.unwrap();
472        
473        // Unsubscribe
474        let result = manager.unsubscribe(&subscription_id).await;
475        assert!(result.is_ok());
476    }
477
478    #[tokio::test]
479    async fn test_sync_state_management() {
480        let storage = Arc::new(Storage::memory());
481        let transport = InMemoryTransport::new();
482        let replica_id = ReplicaId::default();
483        
484        let manager = RealtimeSyncManager::new(replica_id, transport, storage);
485        
486        let state = manager.get_sync_state().await;
487        assert!(!state.is_syncing);
488        assert_eq!(state.connected_users, 0);
489        assert_eq!(state.pending_changes, 0);
490    }
491}