1use crate::crdt::Crdt;
7use crate::delta::{Delta, DeltaEncoder};
8use crate::vector_clock::{ClockOrdering, VectorClock};
9use crate::{DeviceId, SyncError, SyncResult, Timestamp};
10use dashmap::DashMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum DeviceStatus {
20 Online,
22 Offline,
24 Syncing,
26 Error,
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DeviceMetadata {
33 pub device_id: DeviceId,
35 pub status: DeviceStatus,
37 pub last_seen: Timestamp,
39 pub clock: VectorClock,
41 pub metadata: HashMap<String, String>,
43}
44
45impl DeviceMetadata {
46 pub fn new(device_id: DeviceId) -> Self {
52 Self {
53 device_id: device_id.clone(),
54 status: DeviceStatus::Offline,
55 last_seen: Self::current_timestamp(),
56 clock: VectorClock::new(device_id),
57 metadata: HashMap::new(),
58 }
59 }
60
61 fn current_timestamp() -> Timestamp {
63 SystemTime::now()
64 .duration_since(UNIX_EPOCH)
65 .map(|d| d.as_secs())
66 .unwrap_or(0)
67 }
68
69 pub fn update_last_seen(&mut self) {
71 self.last_seen = Self::current_timestamp();
72 }
73
74 pub fn set_status(&mut self, status: DeviceStatus) {
76 self.status = status;
77 self.update_last_seen();
78 }
79
80 pub fn is_online(&self) -> bool {
82 matches!(self.status, DeviceStatus::Online | DeviceStatus::Syncing)
83 }
84
85 pub fn is_stale(&self, timeout_secs: u64) -> bool {
91 let current = Self::current_timestamp();
92 current.saturating_sub(self.last_seen) > timeout_secs
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SyncSession {
99 pub session_id: String,
101 pub source_device: DeviceId,
103 pub target_device: DeviceId,
105 pub started_at: Timestamp,
107 pub completed_at: Option<Timestamp>,
109 pub items_synced: usize,
111 pub bytes_transferred: usize,
113}
114
115impl SyncSession {
116 pub fn new(source_device: DeviceId, target_device: DeviceId) -> Self {
118 Self {
119 session_id: uuid::Uuid::new_v4().to_string(),
120 source_device,
121 target_device,
122 started_at: SystemTime::now()
123 .duration_since(UNIX_EPOCH)
124 .map(|d| d.as_secs())
125 .unwrap_or(0),
126 completed_at: None,
127 items_synced: 0,
128 bytes_transferred: 0,
129 }
130 }
131
132 pub fn complete(&mut self) {
134 self.completed_at = Some(
135 SystemTime::now()
136 .duration_since(UNIX_EPOCH)
137 .map(|d| d.as_secs())
138 .unwrap_or(0),
139 );
140 }
141
142 pub fn duration(&self) -> Option<u64> {
144 self.completed_at
145 .map(|end| end.saturating_sub(self.started_at))
146 }
147
148 pub fn is_completed(&self) -> bool {
150 self.completed_at.is_some()
151 }
152}
153
154pub struct SyncCoordinator {
156 device_id: DeviceId,
158 devices: Arc<DashMap<DeviceId, DeviceMetadata>>,
160 sessions: Arc<RwLock<Vec<SyncSession>>>,
162 delta_encoder: DeltaEncoder,
164}
165
166impl SyncCoordinator {
167 pub fn new(device_id: DeviceId) -> Self {
173 let devices = Arc::new(DashMap::new());
174
175 let mut metadata = DeviceMetadata::new(device_id.clone());
177 metadata.set_status(DeviceStatus::Online);
178 devices.insert(device_id.clone(), metadata);
179
180 Self {
181 device_id,
182 devices,
183 sessions: Arc::new(RwLock::new(Vec::new())),
184 delta_encoder: DeltaEncoder::default_encoder(),
185 }
186 }
187
188 pub fn device_id(&self) -> &DeviceId {
190 &self.device_id
191 }
192
193 pub fn register_device(&self, device_id: DeviceId) -> SyncResult<()> {
199 let metadata = DeviceMetadata::new(device_id.clone());
200 self.devices.insert(device_id, metadata);
201 Ok(())
202 }
203
204 pub fn unregister_device(&self, device_id: &DeviceId) -> SyncResult<()> {
210 self.devices.remove(device_id);
211 Ok(())
212 }
213
214 pub fn update_device_status(
221 &self,
222 device_id: &DeviceId,
223 status: DeviceStatus,
224 ) -> SyncResult<()> {
225 if let Some(mut metadata) = self.devices.get_mut(device_id) {
226 metadata.set_status(status);
227 Ok(())
228 } else {
229 Err(SyncError::InvalidDeviceId(device_id.clone()))
230 }
231 }
232
233 pub fn get_device(&self, device_id: &DeviceId) -> Option<DeviceMetadata> {
239 self.devices
240 .get(device_id)
241 .map(|entry| entry.value().clone())
242 }
243
244 pub fn list_devices(&self) -> Vec<DeviceMetadata> {
246 self.devices
247 .iter()
248 .map(|entry| entry.value().clone())
249 .collect()
250 }
251
252 pub fn list_online_devices(&self) -> Vec<DeviceMetadata> {
254 self.devices
255 .iter()
256 .filter(|entry| entry.value().is_online())
257 .map(|entry| entry.value().clone())
258 .collect()
259 }
260
261 pub fn cleanup_stale_devices(&self, timeout_secs: u64) -> usize {
267 let mut count = 0;
268
269 for mut entry in self.devices.iter_mut() {
270 if entry.value().is_stale(timeout_secs) && entry.value().is_online() {
271 entry.value_mut().set_status(DeviceStatus::Offline);
272 count += 1;
273 }
274 }
275
276 count
277 }
278
279 pub fn start_sync_session(&self, target_device: DeviceId) -> SyncResult<SyncSession> {
285 if !self.devices.contains_key(&target_device) {
287 return Err(SyncError::InvalidDeviceId(target_device));
288 }
289
290 let session = SyncSession::new(self.device_id.clone(), target_device.clone());
291
292 self.update_device_status(&self.device_id, DeviceStatus::Syncing)?;
294 self.update_device_status(&target_device, DeviceStatus::Syncing)?;
295
296 self.sessions.write().push(session.clone());
298
299 Ok(session)
300 }
301
302 pub fn complete_sync_session(&self, session_id: &str) -> SyncResult<()> {
308 let mut sessions = self.sessions.write();
309
310 if let Some(session) = sessions.iter_mut().find(|s| s.session_id == session_id) {
311 session.complete();
312
313 self.update_device_status(&session.source_device, DeviceStatus::Online)?;
315 self.update_device_status(&session.target_device, DeviceStatus::Online)?;
316
317 Ok(())
318 } else {
319 Err(SyncError::CoordinationError(format!(
320 "Session not found: {}",
321 session_id
322 )))
323 }
324 }
325
326 pub fn active_sessions(&self) -> Vec<SyncSession> {
328 self.sessions
329 .read()
330 .iter()
331 .filter(|s| !s.is_completed())
332 .cloned()
333 .collect()
334 }
335
336 pub fn completed_sessions(&self) -> Vec<SyncSession> {
338 self.sessions
339 .read()
340 .iter()
341 .filter(|s| s.is_completed())
342 .cloned()
343 .collect()
344 }
345
346 pub fn sync_crdt<T: Crdt>(&self, local_crdt: &mut T, remote_crdt: &T) -> SyncResult<()> {
353 local_crdt.merge(remote_crdt)
354 }
355
356 pub fn create_delta(&self, base: &[u8], target: &[u8]) -> SyncResult<Delta> {
363 self.delta_encoder.encode(base, target)
364 }
365
366 pub fn apply_delta(&self, base: &[u8], delta: &Delta) -> SyncResult<Vec<u8>> {
373 delta.apply(base)
374 }
375
376 pub fn update_device_clock(&self, device_id: &DeviceId, clock: VectorClock) -> SyncResult<()> {
383 if let Some(mut metadata) = self.devices.get_mut(device_id) {
384 metadata.clock.merge(&clock);
385 metadata.update_last_seen();
386 Ok(())
387 } else {
388 Err(SyncError::InvalidDeviceId(device_id.clone()))
389 }
390 }
391
392 pub fn compare_device_clocks(
399 &self,
400 device1: &DeviceId,
401 device2: &DeviceId,
402 ) -> SyncResult<ClockOrdering> {
403 let clock1 = self
404 .devices
405 .get(device1)
406 .map(|m| m.clock.clone())
407 .ok_or_else(|| SyncError::InvalidDeviceId(device1.clone()))?;
408
409 let clock2 = self
410 .devices
411 .get(device2)
412 .map(|m| m.clock.clone())
413 .ok_or_else(|| SyncError::InvalidDeviceId(device2.clone()))?;
414
415 Ok(clock1.compare(&clock2))
416 }
417}
418
419#[cfg(test)]
420mod tests {
421 use super::*;
422
423 #[test]
424 fn test_device_metadata_creation() {
425 let metadata = DeviceMetadata::new("device-1".to_string());
426 assert_eq!(metadata.device_id, "device-1");
427 assert_eq!(metadata.status, DeviceStatus::Offline);
428 }
429
430 #[test]
431 fn test_device_metadata_status() {
432 let mut metadata = DeviceMetadata::new("device-1".to_string());
433 metadata.set_status(DeviceStatus::Online);
434 assert_eq!(metadata.status, DeviceStatus::Online);
435 assert!(metadata.is_online());
436 }
437
438 #[test]
439 fn test_sync_session_creation() {
440 let session = SyncSession::new("device-1".to_string(), "device-2".to_string());
441 assert_eq!(session.source_device, "device-1");
442 assert_eq!(session.target_device, "device-2");
443 assert!(!session.is_completed());
444 }
445
446 #[test]
447 fn test_sync_session_complete() {
448 let mut session = SyncSession::new("device-1".to_string(), "device-2".to_string());
449 session.complete();
450 assert!(session.is_completed());
451 assert!(session.duration().is_some());
452 }
453
454 #[test]
455 fn test_coordinator_creation() {
456 let coordinator = SyncCoordinator::new("device-1".to_string());
457 assert_eq!(coordinator.device_id(), "device-1");
458 assert_eq!(coordinator.list_devices().len(), 1);
459 }
460
461 #[test]
462 fn test_coordinator_register_device() -> SyncResult<()> {
463 let coordinator = SyncCoordinator::new("device-1".to_string());
464 coordinator.register_device("device-2".to_string())?;
465 assert_eq!(coordinator.list_devices().len(), 2);
466 Ok(())
467 }
468
469 #[test]
470 fn test_coordinator_unregister_device() -> SyncResult<()> {
471 let coordinator = SyncCoordinator::new("device-1".to_string());
472 coordinator.register_device("device-2".to_string())?;
473 coordinator.unregister_device(&"device-2".to_string())?;
474 assert_eq!(coordinator.list_devices().len(), 1);
475 Ok(())
476 }
477
478 #[test]
479 fn test_coordinator_update_status() -> SyncResult<()> {
480 let coordinator = SyncCoordinator::new("device-1".to_string());
481 coordinator.update_device_status(&"device-1".to_string(), DeviceStatus::Syncing)?;
482
483 let metadata = coordinator
484 .get_device(&"device-1".to_string())
485 .ok_or_else(|| SyncError::InvalidDeviceId("device-1".to_string()))?;
486 assert_eq!(metadata.status, DeviceStatus::Syncing);
487
488 Ok(())
489 }
490
491 #[test]
492 fn test_coordinator_sync_session() -> SyncResult<()> {
493 let coordinator = SyncCoordinator::new("device-1".to_string());
494 coordinator.register_device("device-2".to_string())?;
495
496 let session = coordinator.start_sync_session("device-2".to_string())?;
497 assert_eq!(coordinator.active_sessions().len(), 1);
498
499 coordinator.complete_sync_session(&session.session_id)?;
500 assert_eq!(coordinator.active_sessions().len(), 0);
501 assert_eq!(coordinator.completed_sessions().len(), 1);
502
503 Ok(())
504 }
505}