1use crate::crdt::{Mergeable, ReplicaId};
4use crate::storage::{Storage, LocalStorage};
5use crate::transport::SyncTransport;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::{broadcast, RwLock};
11use thiserror::Error;
12
13#[derive(Error, Debug)]
14pub enum RealtimeSyncError {
15 #[error("Transport error: {0}")]
16 Transport(String),
17 #[error("Storage error: {0}")]
18 Storage(String),
19 #[error("Serialization error: {0}")]
20 Serialization(String),
21 #[error("Subscription not found: {0}")]
22 SubscriptionNotFound(String),
23 #[error("Invalid operation: {0}")]
24 InvalidOperation(String),
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum RealtimeEvent {
30 DocumentChanged {
32 key: String,
33 replica_id: ReplicaId,
34 timestamp: DateTime<Utc>,
35 change_type: ChangeType,
36 },
37 UserJoined {
39 replica_id: ReplicaId,
40 timestamp: DateTime<Utc>,
41 user_info: Option<UserInfo>,
42 },
43 UserLeft {
45 replica_id: ReplicaId,
46 timestamp: DateTime<Utc>,
47 },
48 SyncStarted {
50 replica_id: ReplicaId,
51 timestamp: DateTime<Utc>,
52 },
53 SyncCompleted {
55 replica_id: ReplicaId,
56 timestamp: DateTime<Utc>,
57 changes_synced: usize,
58 },
59 ConflictDetected {
61 key: String,
62 replica_id: ReplicaId,
63 timestamp: DateTime<Utc>,
64 conflict_type: String,
65 },
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub enum ChangeType {
71 Created,
72 Updated,
73 Deleted,
74 Merged,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct UserInfo {
80 pub name: Option<String>,
81 pub avatar: Option<String>,
82 pub color: Option<String>,
83}
84
85pub struct RealtimeSyncManager<Tr>
87where
88 Tr: SyncTransport + Clone + Send + Sync + 'static,
89{
90 replica_id: ReplicaId,
91 transport: Tr,
92 storage: Arc<Storage>,
93 event_sender: broadcast::Sender<RealtimeEvent>,
94 subscriptions: Arc<RwLock<HashMap<String, Subscription>>>,
95 active_users: Arc<RwLock<HashMap<ReplicaId, UserInfo>>>,
96 sync_state: Arc<RwLock<SyncState>>,
97 heartbeat_interval: std::time::Duration,
98 presence_timeout: std::time::Duration,
99}
100
101pub struct Subscription {
103 pub id: String,
104 pub event_types: Vec<String>,
105 pub callback: Box<dyn Fn(RealtimeEvent) + Send + Sync>,
106}
107
108#[derive(Debug, Clone)]
110pub struct SyncState {
111 pub is_syncing: bool,
112 pub last_sync: Option<DateTime<Utc>>,
113 pub connected_users: usize,
114 pub pending_changes: usize,
115 pub sync_errors: Vec<String>,
116}
117
118impl<Tr> RealtimeSyncManager<Tr>
119where
120 Tr: SyncTransport + Clone + Send + Sync + 'static,
121{
122 pub fn new(
123 replica_id: ReplicaId,
124 transport: Tr,
125 storage: Arc<Storage>,
126 ) -> Self {
127 let (event_sender, _) = broadcast::channel(1000);
128
129 Self {
130 replica_id,
131 transport,
132 storage,
133 event_sender,
134 subscriptions: Arc::new(RwLock::new(HashMap::new())),
135 active_users: Arc::new(RwLock::new(HashMap::new())),
136 sync_state: Arc::new(RwLock::new(SyncState {
137 is_syncing: false,
138 last_sync: None,
139 connected_users: 0,
140 pending_changes: 0,
141 sync_errors: Vec::new(),
142 })),
143 heartbeat_interval: std::time::Duration::from_secs(30),
144 presence_timeout: std::time::Duration::from_secs(120),
145 }
146 }
147
148 pub async fn start(&mut self) -> Result<(), RealtimeSyncError> {
150 let mut state = self.sync_state.write().await;
151 state.is_syncing = true;
152 drop(state);
153
154 self.announce_presence().await?;
156
157 self.start_heartbeat().await;
159
160 self.start_presence_monitoring().await;
162
163 self.emit_event(RealtimeEvent::SyncStarted {
165 replica_id: self.replica_id,
166 timestamp: Utc::now(),
167 }).await;
168
169 Ok(())
170 }
171
172 pub async fn stop(&mut self) -> Result<(), RealtimeSyncError> {
174 let mut state = self.sync_state.write().await;
175 state.is_syncing = false;
176 drop(state);
177
178 self.announce_departure().await?;
180
181 self.emit_event(RealtimeEvent::SyncCompleted {
183 replica_id: self.replica_id,
184 timestamp: Utc::now(),
185 changes_synced: 0,
186 }).await;
187
188 Ok(())
189 }
190
191 pub async fn subscribe(
193 &self,
194 event_types: Vec<String>,
195 callback: Box<dyn Fn(RealtimeEvent) + Send + Sync>,
196 ) -> Result<String, RealtimeSyncError> {
197 let subscription_id = uuid::Uuid::new_v4().to_string();
198
199 let subscription = Subscription {
200 id: subscription_id.clone(),
201 event_types,
202 callback,
203 };
204
205 let mut subscriptions = self.subscriptions.write().await;
206 subscriptions.insert(subscription_id.clone(), subscription);
207
208 Ok(subscription_id)
209 }
210
211 pub async fn unsubscribe(&self, subscription_id: &str) -> Result<(), RealtimeSyncError> {
213 let mut subscriptions = self.subscriptions.write().await;
214
215 if subscriptions.remove(subscription_id).is_some() {
216 Ok(())
217 } else {
218 Err(RealtimeSyncError::SubscriptionNotFound(subscription_id.to_string()))
219 }
220 }
221
222 pub async fn broadcast_change<T: Mergeable + Serialize + Clone>(
224 &self,
225 key: &str,
226 value: &T,
227 change_type: ChangeType,
228 ) -> Result<(), RealtimeSyncError> {
229 self.storage.set(key, value).await
231 .map_err(|e| RealtimeSyncError::Storage(e.to_string()))?;
232
233 let change_message = ChangeMessage {
235 key: key.to_string(),
236 data: serde_json::to_vec(value)
237 .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?,
238 replica_id: self.replica_id,
239 timestamp: Utc::now(),
240 change_type: change_type.clone(),
241 };
242
243 let message_bytes = serde_json::to_vec(&change_message)
244 .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
245
246 self.transport.send(&message_bytes).await
247 .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
248
249 self.emit_event(RealtimeEvent::DocumentChanged {
251 key: key.to_string(),
252 replica_id: self.replica_id,
253 timestamp: Utc::now(),
254 change_type,
255 }).await;
256
257 Ok(())
258 }
259
260 pub async fn process_incoming_changes(&mut self) -> Result<usize, RealtimeSyncError> {
262 let messages = self.transport.receive().await
263 .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
264
265 let mut changes_processed = 0;
266
267 for message_bytes in messages {
268 if let Ok(change_message) = serde_json::from_slice::<ChangeMessage>(&message_bytes) {
269 self.process_change(change_message).await?;
271 changes_processed += 1;
272 }
273 }
274
275 let mut state = self.sync_state.write().await;
277 state.last_sync = Some(Utc::now());
278 state.pending_changes = state.pending_changes.saturating_sub(changes_processed);
279
280 Ok(changes_processed)
281 }
282
283 pub async fn get_sync_state(&self) -> SyncState {
285 self.sync_state.read().await.clone()
286 }
287
288 pub async fn get_active_users(&self) -> HashMap<ReplicaId, UserInfo> {
290 self.active_users.read().await.clone()
291 }
292
293 async fn announce_presence(&self) -> Result<(), RealtimeSyncError> {
295 let presence_message = PresenceMessage {
296 replica_id: self.replica_id,
297 timestamp: Utc::now(),
298 user_info: None, };
300
301 let message_bytes = serde_json::to_vec(&presence_message)
302 .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
303
304 self.transport.send(&message_bytes).await
305 .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
306
307 Ok(())
308 }
309
310 async fn announce_departure(&self) -> Result<(), RealtimeSyncError> {
312 let departure_message = DepartureMessage {
313 replica_id: self.replica_id,
314 timestamp: Utc::now(),
315 };
316
317 let message_bytes = serde_json::to_vec(&departure_message)
318 .map_err(|e| RealtimeSyncError::Serialization(e.to_string()))?;
319
320 self.transport.send(&message_bytes).await
321 .map_err(|e| RealtimeSyncError::Transport(e.to_string()))?;
322
323 Ok(())
324 }
325
326 async fn start_heartbeat(&self) {
328 let transport = self.transport.clone();
329 let replica_id = self.replica_id;
330 let interval = self.heartbeat_interval;
331
332 tokio::spawn(async move {
333 let mut interval_timer = tokio::time::interval(interval);
334
335 loop {
336 interval_timer.tick().await;
337
338 let heartbeat_message = HeartbeatMessage {
340 replica_id,
341 timestamp: Utc::now(),
342 };
343
344 if let Ok(message_bytes) = serde_json::to_vec(&heartbeat_message) {
345 let _ = transport.send(&message_bytes).await;
346 }
347 }
348 });
349 }
350
351 async fn start_presence_monitoring(&self) {
353 let active_users = self.active_users.clone();
354 let timeout = self.presence_timeout;
355
356 tokio::spawn(async move {
357 let mut interval_timer = tokio::time::interval(std::time::Duration::from_secs(60));
358
359 loop {
360 interval_timer.tick().await;
361
362 let now = Utc::now();
363 let mut users = active_users.write().await;
364
365 users.retain(|_, user_info| {
367 true
369 });
370 }
371 });
372 }
373
374 async fn process_change(&mut self, change_message: ChangeMessage) -> Result<(), RealtimeSyncError> {
376 self.emit_event(RealtimeEvent::DocumentChanged {
378 key: change_message.key.clone(),
379 replica_id: change_message.replica_id,
380 timestamp: change_message.timestamp,
381 change_type: change_message.change_type,
382 }).await;
383
384 Ok(())
385 }
386
387 async fn emit_event(&self, event: RealtimeEvent) {
389 let subscriptions = self.subscriptions.read().await;
390
391 for subscription in subscriptions.values() {
392 let event_type = match &event {
394 RealtimeEvent::DocumentChanged { .. } => "document_changed",
395 RealtimeEvent::UserJoined { .. } => "user_joined",
396 RealtimeEvent::UserLeft { .. } => "user_left",
397 RealtimeEvent::SyncStarted { .. } => "sync_started",
398 RealtimeEvent::SyncCompleted { .. } => "sync_completed",
399 RealtimeEvent::ConflictDetected { .. } => "conflict_detected",
400 };
401
402 if subscription.event_types.contains(&event_type.to_string())
403 || subscription.event_types.contains(&"*".to_string()) {
404 (subscription.callback)(event.clone());
405 }
406 }
407 }
408}
409
410#[derive(Debug, Clone, Serialize, Deserialize)]
412struct ChangeMessage {
413 key: String,
414 data: Vec<u8>,
415 replica_id: ReplicaId,
416 timestamp: DateTime<Utc>,
417 change_type: ChangeType,
418}
419
420#[derive(Debug, Clone, Serialize, Deserialize)]
422struct PresenceMessage {
423 replica_id: ReplicaId,
424 timestamp: DateTime<Utc>,
425 user_info: Option<UserInfo>,
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize)]
430struct DepartureMessage {
431 replica_id: ReplicaId,
432 timestamp: DateTime<Utc>,
433}
434
435#[derive(Debug, Clone, Serialize, Deserialize)]
437struct HeartbeatMessage {
438 replica_id: ReplicaId,
439 timestamp: DateTime<Utc>,
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use crate::storage::memory::MemoryStorage;
446 use crate::transport::memory::InMemoryTransport;
447
448 #[tokio::test]
449 async fn test_realtime_sync_manager_creation() {
450 let storage = Arc::new(Storage::memory());
451 let transport = InMemoryTransport::new();
452 let replica_id = ReplicaId::default();
453
454 let manager = RealtimeSyncManager::new(replica_id, transport, storage);
455 assert_eq!(manager.replica_id, replica_id);
456 }
457
458 #[tokio::test]
459 async fn test_subscription_management() {
460 let storage = Arc::new(Storage::memory());
461 let transport = InMemoryTransport::new();
462 let replica_id = ReplicaId::default();
463
464 let manager = RealtimeSyncManager::new(replica_id, transport, storage);
465
466 let callback = Box::new(|_event: RealtimeEvent| {});
468 let subscription_id = manager.subscribe(
469 vec!["document_changed".to_string()],
470 callback
471 ).await.unwrap();
472
473 let result = manager.unsubscribe(&subscription_id).await;
475 assert!(result.is_ok());
476 }
477
478 #[tokio::test]
479 async fn test_sync_state_management() {
480 let storage = Arc::new(Storage::memory());
481 let transport = InMemoryTransport::new();
482 let replica_id = ReplicaId::default();
483
484 let manager = RealtimeSyncManager::new(replica_id, transport, storage);
485
486 let state = manager.get_sync_state().await;
487 assert!(!state.is_syncing);
488 assert_eq!(state.connected_users, 0);
489 assert_eq!(state.pending_changes, 0);
490 }
491}