Skip to main content

oxigdal_sync/
coordinator.rs

1//! Multi-device coordination and state management
2//!
3//! This module provides coordination capabilities for synchronizing
4//! state across multiple devices in a distributed system.
5
6use crate::crdt::Crdt;
7use crate::delta::{Delta, DeltaEncoder};
8use crate::vector_clock::{ClockOrdering, VectorClock};
9use crate::{DeviceId, SyncError, SyncResult, Timestamp};
10use dashmap::DashMap;
11use parking_lot::RwLock;
12use serde::{Deserialize, Serialize};
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17/// Device status
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19pub enum DeviceStatus {
20    /// Device is online and active
21    Online,
22    /// Device is offline
23    Offline,
24    /// Device is syncing
25    Syncing,
26    /// Device encountered an error
27    Error,
28}
29
30/// Device metadata
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct DeviceMetadata {
33    /// Device identifier
34    pub device_id: DeviceId,
35    /// Current status
36    pub status: DeviceStatus,
37    /// Last seen timestamp
38    pub last_seen: Timestamp,
39    /// Vector clock for this device
40    pub clock: VectorClock,
41    /// Custom metadata
42    pub metadata: HashMap<String, String>,
43}
44
45impl DeviceMetadata {
46    /// Creates new device metadata
47    ///
48    /// # Arguments
49    ///
50    /// * `device_id` - The device identifier
51    pub fn new(device_id: DeviceId) -> Self {
52        Self {
53            device_id: device_id.clone(),
54            status: DeviceStatus::Offline,
55            last_seen: Self::current_timestamp(),
56            clock: VectorClock::new(device_id),
57            metadata: HashMap::new(),
58        }
59    }
60
61    /// Gets the current Unix timestamp
62    fn current_timestamp() -> Timestamp {
63        SystemTime::now()
64            .duration_since(UNIX_EPOCH)
65            .map(|d| d.as_secs())
66            .unwrap_or(0)
67    }
68
69    /// Updates the last seen timestamp
70    pub fn update_last_seen(&mut self) {
71        self.last_seen = Self::current_timestamp();
72    }
73
74    /// Sets device status
75    pub fn set_status(&mut self, status: DeviceStatus) {
76        self.status = status;
77        self.update_last_seen();
78    }
79
80    /// Checks if device is online
81    pub fn is_online(&self) -> bool {
82        matches!(self.status, DeviceStatus::Online | DeviceStatus::Syncing)
83    }
84
85    /// Checks if device is stale (hasn't been seen recently)
86    ///
87    /// # Arguments
88    ///
89    /// * `timeout_secs` - Timeout in seconds
90    pub fn is_stale(&self, timeout_secs: u64) -> bool {
91        let current = Self::current_timestamp();
92        current.saturating_sub(self.last_seen) > timeout_secs
93    }
94}
95
96/// Synchronization session
97#[derive(Debug, Clone, Serialize, Deserialize)]
98pub struct SyncSession {
99    /// Session ID
100    pub session_id: String,
101    /// Source device
102    pub source_device: DeviceId,
103    /// Target device
104    pub target_device: DeviceId,
105    /// Start timestamp
106    pub started_at: Timestamp,
107    /// End timestamp (if completed)
108    pub completed_at: Option<Timestamp>,
109    /// Number of items synced
110    pub items_synced: usize,
111    /// Bytes transferred
112    pub bytes_transferred: usize,
113}
114
115impl SyncSession {
116    /// Creates a new sync session
117    pub fn new(source_device: DeviceId, target_device: DeviceId) -> Self {
118        Self {
119            session_id: uuid::Uuid::new_v4().to_string(),
120            source_device,
121            target_device,
122            started_at: SystemTime::now()
123                .duration_since(UNIX_EPOCH)
124                .map(|d| d.as_secs())
125                .unwrap_or(0),
126            completed_at: None,
127            items_synced: 0,
128            bytes_transferred: 0,
129        }
130    }
131
132    /// Marks the session as completed
133    pub fn complete(&mut self) {
134        self.completed_at = Some(
135            SystemTime::now()
136                .duration_since(UNIX_EPOCH)
137                .map(|d| d.as_secs())
138                .unwrap_or(0),
139        );
140    }
141
142    /// Gets the duration of the session in seconds
143    pub fn duration(&self) -> Option<u64> {
144        self.completed_at
145            .map(|end| end.saturating_sub(self.started_at))
146    }
147
148    /// Checks if the session is completed
149    pub fn is_completed(&self) -> bool {
150        self.completed_at.is_some()
151    }
152}
153
154/// Sync coordinator for managing multi-device state
155pub struct SyncCoordinator {
156    /// This device's ID
157    device_id: DeviceId,
158    /// Device registry
159    devices: Arc<DashMap<DeviceId, DeviceMetadata>>,
160    /// Active sync sessions
161    sessions: Arc<RwLock<Vec<SyncSession>>>,
162    /// Delta encoder
163    delta_encoder: DeltaEncoder,
164}
165
166impl SyncCoordinator {
167    /// Creates a new sync coordinator
168    ///
169    /// # Arguments
170    ///
171    /// * `device_id` - This device's identifier
172    pub fn new(device_id: DeviceId) -> Self {
173        let devices = Arc::new(DashMap::new());
174
175        // Register this device
176        let mut metadata = DeviceMetadata::new(device_id.clone());
177        metadata.set_status(DeviceStatus::Online);
178        devices.insert(device_id.clone(), metadata);
179
180        Self {
181            device_id,
182            devices,
183            sessions: Arc::new(RwLock::new(Vec::new())),
184            delta_encoder: DeltaEncoder::default_encoder(),
185        }
186    }
187
188    /// Gets this device's ID
189    pub fn device_id(&self) -> &DeviceId {
190        &self.device_id
191    }
192
193    /// Registers a new device
194    ///
195    /// # Arguments
196    ///
197    /// * `device_id` - The device to register
198    pub fn register_device(&self, device_id: DeviceId) -> SyncResult<()> {
199        let metadata = DeviceMetadata::new(device_id.clone());
200        self.devices.insert(device_id, metadata);
201        Ok(())
202    }
203
204    /// Unregisters a device
205    ///
206    /// # Arguments
207    ///
208    /// * `device_id` - The device to unregister
209    pub fn unregister_device(&self, device_id: &DeviceId) -> SyncResult<()> {
210        self.devices.remove(device_id);
211        Ok(())
212    }
213
214    /// Updates device status
215    ///
216    /// # Arguments
217    ///
218    /// * `device_id` - The device to update
219    /// * `status` - The new status
220    pub fn update_device_status(
221        &self,
222        device_id: &DeviceId,
223        status: DeviceStatus,
224    ) -> SyncResult<()> {
225        if let Some(mut metadata) = self.devices.get_mut(device_id) {
226            metadata.set_status(status);
227            Ok(())
228        } else {
229            Err(SyncError::InvalidDeviceId(device_id.clone()))
230        }
231    }
232
233    /// Gets device metadata
234    ///
235    /// # Arguments
236    ///
237    /// * `device_id` - The device to query
238    pub fn get_device(&self, device_id: &DeviceId) -> Option<DeviceMetadata> {
239        self.devices
240            .get(device_id)
241            .map(|entry| entry.value().clone())
242    }
243
244    /// Lists all registered devices
245    pub fn list_devices(&self) -> Vec<DeviceMetadata> {
246        self.devices
247            .iter()
248            .map(|entry| entry.value().clone())
249            .collect()
250    }
251
252    /// Lists all online devices
253    pub fn list_online_devices(&self) -> Vec<DeviceMetadata> {
254        self.devices
255            .iter()
256            .filter(|entry| entry.value().is_online())
257            .map(|entry| entry.value().clone())
258            .collect()
259    }
260
261    /// Checks for stale devices and marks them offline
262    ///
263    /// # Arguments
264    ///
265    /// * `timeout_secs` - Timeout in seconds
266    pub fn cleanup_stale_devices(&self, timeout_secs: u64) -> usize {
267        let mut count = 0;
268
269        for mut entry in self.devices.iter_mut() {
270            if entry.value().is_stale(timeout_secs) && entry.value().is_online() {
271                entry.value_mut().set_status(DeviceStatus::Offline);
272                count += 1;
273            }
274        }
275
276        count
277    }
278
279    /// Starts a sync session between two devices
280    ///
281    /// # Arguments
282    ///
283    /// * `target_device` - The device to sync with
284    pub fn start_sync_session(&self, target_device: DeviceId) -> SyncResult<SyncSession> {
285        // Verify target device exists
286        if !self.devices.contains_key(&target_device) {
287            return Err(SyncError::InvalidDeviceId(target_device));
288        }
289
290        let session = SyncSession::new(self.device_id.clone(), target_device.clone());
291
292        // Update device statuses
293        self.update_device_status(&self.device_id, DeviceStatus::Syncing)?;
294        self.update_device_status(&target_device, DeviceStatus::Syncing)?;
295
296        // Record session
297        self.sessions.write().push(session.clone());
298
299        Ok(session)
300    }
301
302    /// Completes a sync session
303    ///
304    /// # Arguments
305    ///
306    /// * `session_id` - The session to complete
307    pub fn complete_sync_session(&self, session_id: &str) -> SyncResult<()> {
308        let mut sessions = self.sessions.write();
309
310        if let Some(session) = sessions.iter_mut().find(|s| s.session_id == session_id) {
311            session.complete();
312
313            // Update device statuses back to online
314            self.update_device_status(&session.source_device, DeviceStatus::Online)?;
315            self.update_device_status(&session.target_device, DeviceStatus::Online)?;
316
317            Ok(())
318        } else {
319            Err(SyncError::CoordinationError(format!(
320                "Session not found: {}",
321                session_id
322            )))
323        }
324    }
325
326    /// Gets active sync sessions
327    pub fn active_sessions(&self) -> Vec<SyncSession> {
328        self.sessions
329            .read()
330            .iter()
331            .filter(|s| !s.is_completed())
332            .cloned()
333            .collect()
334    }
335
336    /// Gets completed sync sessions
337    pub fn completed_sessions(&self) -> Vec<SyncSession> {
338        self.sessions
339            .read()
340            .iter()
341            .filter(|s| s.is_completed())
342            .cloned()
343            .collect()
344    }
345
346    /// Synchronizes CRDT state with another device
347    ///
348    /// # Arguments
349    ///
350    /// * `local_crdt` - Local CRDT state
351    /// * `remote_crdt` - Remote CRDT state
352    pub fn sync_crdt<T: Crdt>(&self, local_crdt: &mut T, remote_crdt: &T) -> SyncResult<()> {
353        local_crdt.merge(remote_crdt)
354    }
355
356    /// Creates a delta between two data versions
357    ///
358    /// # Arguments
359    ///
360    /// * `base` - Base data
361    /// * `target` - Target data
362    pub fn create_delta(&self, base: &[u8], target: &[u8]) -> SyncResult<Delta> {
363        self.delta_encoder.encode(base, target)
364    }
365
366    /// Applies a delta to base data
367    ///
368    /// # Arguments
369    ///
370    /// * `base` - Base data
371    /// * `delta` - Delta to apply
372    pub fn apply_delta(&self, base: &[u8], delta: &Delta) -> SyncResult<Vec<u8>> {
373        delta.apply(base)
374    }
375
376    /// Updates device clock
377    ///
378    /// # Arguments
379    ///
380    /// * `device_id` - Device to update
381    /// * `clock` - New clock value
382    pub fn update_device_clock(&self, device_id: &DeviceId, clock: VectorClock) -> SyncResult<()> {
383        if let Some(mut metadata) = self.devices.get_mut(device_id) {
384            metadata.clock.merge(&clock);
385            metadata.update_last_seen();
386            Ok(())
387        } else {
388            Err(SyncError::InvalidDeviceId(device_id.clone()))
389        }
390    }
391
392    /// Compares clocks between two devices
393    ///
394    /// # Arguments
395    ///
396    /// * `device1` - First device
397    /// * `device2` - Second device
398    pub fn compare_device_clocks(
399        &self,
400        device1: &DeviceId,
401        device2: &DeviceId,
402    ) -> SyncResult<ClockOrdering> {
403        let clock1 = self
404            .devices
405            .get(device1)
406            .map(|m| m.clock.clone())
407            .ok_or_else(|| SyncError::InvalidDeviceId(device1.clone()))?;
408
409        let clock2 = self
410            .devices
411            .get(device2)
412            .map(|m| m.clock.clone())
413            .ok_or_else(|| SyncError::InvalidDeviceId(device2.clone()))?;
414
415        Ok(clock1.compare(&clock2))
416    }
417}
418
419#[cfg(test)]
420mod tests {
421    use super::*;
422
423    #[test]
424    fn test_device_metadata_creation() {
425        let metadata = DeviceMetadata::new("device-1".to_string());
426        assert_eq!(metadata.device_id, "device-1");
427        assert_eq!(metadata.status, DeviceStatus::Offline);
428    }
429
430    #[test]
431    fn test_device_metadata_status() {
432        let mut metadata = DeviceMetadata::new("device-1".to_string());
433        metadata.set_status(DeviceStatus::Online);
434        assert_eq!(metadata.status, DeviceStatus::Online);
435        assert!(metadata.is_online());
436    }
437
438    #[test]
439    fn test_sync_session_creation() {
440        let session = SyncSession::new("device-1".to_string(), "device-2".to_string());
441        assert_eq!(session.source_device, "device-1");
442        assert_eq!(session.target_device, "device-2");
443        assert!(!session.is_completed());
444    }
445
446    #[test]
447    fn test_sync_session_complete() {
448        let mut session = SyncSession::new("device-1".to_string(), "device-2".to_string());
449        session.complete();
450        assert!(session.is_completed());
451        assert!(session.duration().is_some());
452    }
453
454    #[test]
455    fn test_coordinator_creation() {
456        let coordinator = SyncCoordinator::new("device-1".to_string());
457        assert_eq!(coordinator.device_id(), "device-1");
458        assert_eq!(coordinator.list_devices().len(), 1);
459    }
460
461    #[test]
462    fn test_coordinator_register_device() -> SyncResult<()> {
463        let coordinator = SyncCoordinator::new("device-1".to_string());
464        coordinator.register_device("device-2".to_string())?;
465        assert_eq!(coordinator.list_devices().len(), 2);
466        Ok(())
467    }
468
469    #[test]
470    fn test_coordinator_unregister_device() -> SyncResult<()> {
471        let coordinator = SyncCoordinator::new("device-1".to_string());
472        coordinator.register_device("device-2".to_string())?;
473        coordinator.unregister_device(&"device-2".to_string())?;
474        assert_eq!(coordinator.list_devices().len(), 1);
475        Ok(())
476    }
477
478    #[test]
479    fn test_coordinator_update_status() -> SyncResult<()> {
480        let coordinator = SyncCoordinator::new("device-1".to_string());
481        coordinator.update_device_status(&"device-1".to_string(), DeviceStatus::Syncing)?;
482
483        let metadata = coordinator
484            .get_device(&"device-1".to_string())
485            .ok_or_else(|| SyncError::InvalidDeviceId("device-1".to_string()))?;
486        assert_eq!(metadata.status, DeviceStatus::Syncing);
487
488        Ok(())
489    }
490
491    #[test]
492    fn test_coordinator_sync_session() -> SyncResult<()> {
493        let coordinator = SyncCoordinator::new("device-1".to_string());
494        coordinator.register_device("device-2".to_string())?;
495
496        let session = coordinator.start_sync_session("device-2".to_string())?;
497        assert_eq!(coordinator.active_sessions().len(), 1);
498
499        coordinator.complete_sync_session(&session.session_id)?;
500        assert_eq!(coordinator.active_sessions().len(), 0);
501        assert_eq!(coordinator.completed_sessions().len(), 1);
502
503        Ok(())
504    }
505}