Skip to main content

canvas_core/
offline.rs

1//! # Offline Mode and Sync
2//!
3//! Provides offline operation queuing and eventual consistency.
4//!
5//! ## Usage
6//!
7//! ```text
8//! 1. When online: operations execute immediately
9//! 2. When offline: operations queue locally
10//! 3. On reconnect: queued operations sync with conflict resolution
11//! ```
12
13use crate::element::{Element, ElementId};
14use crate::event::InputEvent;
15use serde::{Deserialize, Serialize};
16use std::collections::VecDeque;
17use std::time::{SystemTime, UNIX_EPOCH};
18
19/// A queued operation for offline sync.
20#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
21pub enum Operation {
22    /// Add a new element to the scene.
23    AddElement {
24        /// The element to add.
25        element: Element,
26        /// Timestamp when operation was created (ms since epoch).
27        timestamp: u64,
28    },
29    /// Update an existing element.
30    UpdateElement {
31        /// The element ID to update.
32        id: ElementId,
33        /// The changes as JSON patch.
34        changes: serde_json::Value,
35        /// Timestamp when operation was created (ms since epoch).
36        timestamp: u64,
37    },
38    /// Remove an element from the scene.
39    RemoveElement {
40        /// The element ID to remove.
41        id: ElementId,
42        /// Timestamp when operation was created (ms since epoch).
43        timestamp: u64,
44    },
45    /// A user interaction event.
46    Interaction {
47        /// The input event.
48        event: InputEvent,
49        /// Timestamp when operation was created (ms since epoch).
50        timestamp: u64,
51    },
52}
53
54impl Operation {
55    /// Get the timestamp of this operation.
56    #[must_use]
57    pub const fn timestamp(&self) -> u64 {
58        match self {
59            Self::AddElement { timestamp, .. }
60            | Self::UpdateElement { timestamp, .. }
61            | Self::RemoveElement { timestamp, .. }
62            | Self::Interaction { timestamp, .. } => *timestamp,
63        }
64    }
65
66    /// Get the current timestamp in milliseconds since epoch.
67    #[must_use]
68    #[allow(clippy::cast_possible_truncation)] // Timestamps won't exceed u64 for billions of years
69    pub fn now() -> u64 {
70        SystemTime::now()
71            .duration_since(UNIX_EPOCH)
72            .map(|d| d.as_millis() as u64)
73            .unwrap_or(0)
74    }
75}
76
77/// Result of a sync operation.
78#[derive(Debug, Clone, Default, Serialize, Deserialize)]
79pub struct SyncResult {
80    /// Number of operations successfully synced.
81    pub synced_count: usize,
82    /// Number of operations that had conflicts.
83    pub conflict_count: usize,
84    /// Number of operations received from remote.
85    pub received_count: usize,
86    /// Whether all operations were synced successfully.
87    pub success: bool,
88}
89
90/// Conflict resolution strategy.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
92pub enum ConflictStrategy {
93    /// Last write wins - newer timestamp takes precedence.
94    #[default]
95    LastWriteWins,
96    /// Local wins - local operations take precedence.
97    LocalWins,
98    /// Remote wins - remote operations take precedence.
99    RemoteWins,
100}
101
102/// Queue for offline operations with persistence support.
103#[derive(Debug, Clone, Default, Serialize, Deserialize)]
104pub struct OfflineQueue {
105    /// Pending operations to sync.
106    pending: VecDeque<Operation>,
107    /// Last successful sync timestamp.
108    last_sync: Option<u64>,
109    /// Conflict resolution strategy.
110    strategy: ConflictStrategy,
111    /// Maximum queue size (oldest operations dropped when exceeded).
112    max_size: usize,
113}
114
115impl OfflineQueue {
116    /// Create a new empty offline queue.
117    #[must_use]
118    pub fn new() -> Self {
119        Self {
120            pending: VecDeque::new(),
121            last_sync: None,
122            strategy: ConflictStrategy::default(),
123            max_size: 1000,
124        }
125    }
126
127    /// Create a queue with a custom max size.
128    #[must_use]
129    pub fn with_max_size(max_size: usize) -> Self {
130        Self {
131            max_size,
132            ..Self::new()
133        }
134    }
135
136    /// Set the conflict resolution strategy.
137    pub fn set_strategy(&mut self, strategy: ConflictStrategy) {
138        self.strategy = strategy;
139    }
140
141    /// Get the current conflict resolution strategy.
142    #[must_use]
143    pub const fn strategy(&self) -> ConflictStrategy {
144        self.strategy
145    }
146
147    /// Enqueue an operation.
148    pub fn enqueue(&mut self, op: Operation) {
149        // Drop oldest if at capacity
150        if self.pending.len() >= self.max_size {
151            self.pending.pop_front();
152        }
153        self.pending.push_back(op);
154    }
155
156    /// Get the number of pending operations.
157    #[must_use]
158    pub fn len(&self) -> usize {
159        self.pending.len()
160    }
161
162    /// Check if the queue is empty.
163    #[must_use]
164    pub fn is_empty(&self) -> bool {
165        self.pending.is_empty()
166    }
167
168    /// Get the last sync timestamp.
169    #[must_use]
170    pub const fn last_sync(&self) -> Option<u64> {
171        self.last_sync
172    }
173
174    /// Peek at pending operations without removing them.
175    #[must_use]
176    pub fn pending(&self) -> &VecDeque<Operation> {
177        &self.pending
178    }
179
180    /// Take all pending operations for syncing.
181    pub fn take_pending(&mut self) -> Vec<Operation> {
182        self.pending.drain(..).collect()
183    }
184
185    /// Mark operations as synced successfully.
186    pub fn mark_synced(&mut self, count: usize, timestamp: u64) {
187        self.last_sync = Some(timestamp);
188        // Operations are already removed by take_pending
189        let _ = count; // Used for logging/metrics
190    }
191
192    /// Re-queue operations that failed to sync.
193    pub fn requeue(&mut self, ops: Vec<Operation>) {
194        for op in ops.into_iter().rev() {
195            self.pending.push_front(op);
196        }
197    }
198
199    /// Resolve conflicts between local and remote operations.
200    #[must_use]
201    pub fn resolve_conflict(&self, local: &Operation, remote: &Operation) -> ConflictResolution {
202        match self.strategy {
203            ConflictStrategy::LastWriteWins => {
204                if local.timestamp() >= remote.timestamp() {
205                    ConflictResolution::KeepLocal
206                } else {
207                    ConflictResolution::KeepRemote
208                }
209            }
210            ConflictStrategy::LocalWins => ConflictResolution::KeepLocal,
211            ConflictStrategy::RemoteWins => ConflictResolution::KeepRemote,
212        }
213    }
214
215    /// Clear all pending operations.
216    pub fn clear(&mut self) {
217        self.pending.clear();
218    }
219
220    /// Serialize the queue for persistence.
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if serialization fails.
225    pub fn to_json(&self) -> Result<String, serde_json::Error> {
226        serde_json::to_string(self)
227    }
228
229    /// Deserialize a queue from persisted JSON.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if deserialization fails.
234    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
235        serde_json::from_str(json)
236    }
237}
238
239/// Result of conflict resolution.
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum ConflictResolution {
242    /// Keep the local operation.
243    KeepLocal,
244    /// Keep the remote operation.
245    KeepRemote,
246    /// Merge both operations (not yet implemented).
247    Merge,
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crate::element::ElementKind;
254
255    fn create_test_element() -> Element {
256        Element::new(ElementKind::Text {
257            content: "Test".to_string(),
258            font_size: 16.0,
259            color: "#000000".to_string(),
260        })
261    }
262
263    #[test]
264    fn test_queue_creation() {
265        let queue = OfflineQueue::new();
266        assert!(queue.is_empty());
267        assert_eq!(queue.len(), 0);
268        assert!(queue.last_sync().is_none());
269    }
270
271    #[test]
272    fn test_queue_with_max_size() {
273        let queue = OfflineQueue::with_max_size(10);
274        assert_eq!(queue.max_size, 10);
275    }
276
277    #[test]
278    fn test_enqueue_operation() {
279        let mut queue = OfflineQueue::new();
280        let element = create_test_element();
281        let op = Operation::AddElement {
282            element,
283            timestamp: Operation::now(),
284        };
285
286        queue.enqueue(op);
287        assert_eq!(queue.len(), 1);
288        assert!(!queue.is_empty());
289    }
290
291    #[test]
292    fn test_enqueue_multiple() {
293        let mut queue = OfflineQueue::new();
294
295        for i in 0..5 {
296            let op = Operation::RemoveElement {
297                id: ElementId::new(),
298                timestamp: Operation::now() + i,
299            };
300            queue.enqueue(op);
301        }
302
303        assert_eq!(queue.len(), 5);
304    }
305
306    #[test]
307    fn test_max_size_drops_oldest() {
308        let mut queue = OfflineQueue::with_max_size(3);
309
310        for i in 0_u64..5 {
311            let op = Operation::RemoveElement {
312                id: ElementId::new(),
313                timestamp: i,
314            };
315            queue.enqueue(op);
316        }
317
318        // Should only have 3 (the newest)
319        assert_eq!(queue.len(), 3);
320
321        // Check timestamps are 2, 3, 4 (oldest dropped)
322        let pending: Vec<_> = queue.pending().iter().collect();
323        assert_eq!(pending[0].timestamp(), 2);
324        assert_eq!(pending[1].timestamp(), 3);
325        assert_eq!(pending[2].timestamp(), 4);
326    }
327
328    #[test]
329    fn test_take_pending() {
330        let mut queue = OfflineQueue::new();
331        queue.enqueue(Operation::RemoveElement {
332            id: ElementId::new(),
333            timestamp: 1,
334        });
335        queue.enqueue(Operation::RemoveElement {
336            id: ElementId::new(),
337            timestamp: 2,
338        });
339
340        let ops = queue.take_pending();
341        assert_eq!(ops.len(), 2);
342        assert!(queue.is_empty());
343    }
344
345    #[test]
346    fn test_requeue() {
347        let mut queue = OfflineQueue::new();
348        let id = ElementId::new();
349
350        queue.enqueue(Operation::RemoveElement { id, timestamp: 1 });
351
352        let ops = queue.take_pending();
353        assert!(queue.is_empty());
354
355        queue.requeue(ops);
356        assert_eq!(queue.len(), 1);
357    }
358
359    #[test]
360    fn test_mark_synced() {
361        let mut queue = OfflineQueue::new();
362        queue.enqueue(Operation::RemoveElement {
363            id: ElementId::new(),
364            timestamp: 1,
365        });
366
367        let _ = queue.take_pending();
368        queue.mark_synced(1, 1000);
369
370        assert_eq!(queue.last_sync(), Some(1000));
371    }
372
373    #[test]
374    fn test_conflict_last_write_wins_local() {
375        let queue = OfflineQueue::new();
376
377        let local = Operation::RemoveElement {
378            id: ElementId::new(),
379            timestamp: 200,
380        };
381        let remote = Operation::RemoveElement {
382            id: ElementId::new(),
383            timestamp: 100,
384        };
385
386        let resolution = queue.resolve_conflict(&local, &remote);
387        assert_eq!(resolution, ConflictResolution::KeepLocal);
388    }
389
390    #[test]
391    fn test_conflict_last_write_wins_remote() {
392        let queue = OfflineQueue::new();
393
394        let local = Operation::RemoveElement {
395            id: ElementId::new(),
396            timestamp: 100,
397        };
398        let remote = Operation::RemoveElement {
399            id: ElementId::new(),
400            timestamp: 200,
401        };
402
403        let resolution = queue.resolve_conflict(&local, &remote);
404        assert_eq!(resolution, ConflictResolution::KeepRemote);
405    }
406
407    #[test]
408    fn test_conflict_local_wins() {
409        let mut queue = OfflineQueue::new();
410        queue.set_strategy(ConflictStrategy::LocalWins);
411
412        let local = Operation::RemoveElement {
413            id: ElementId::new(),
414            timestamp: 100,
415        };
416        let remote = Operation::RemoveElement {
417            id: ElementId::new(),
418            timestamp: 200,
419        };
420
421        let resolution = queue.resolve_conflict(&local, &remote);
422        assert_eq!(resolution, ConflictResolution::KeepLocal);
423    }
424
425    #[test]
426    fn test_conflict_remote_wins() {
427        let mut queue = OfflineQueue::new();
428        queue.set_strategy(ConflictStrategy::RemoteWins);
429
430        let local = Operation::RemoveElement {
431            id: ElementId::new(),
432            timestamp: 200,
433        };
434        let remote = Operation::RemoveElement {
435            id: ElementId::new(),
436            timestamp: 100,
437        };
438
439        let resolution = queue.resolve_conflict(&local, &remote);
440        assert_eq!(resolution, ConflictResolution::KeepRemote);
441    }
442
443    #[test]
444    fn test_json_serialization() {
445        let mut queue = OfflineQueue::new();
446        let element = create_test_element();
447
448        queue.enqueue(Operation::AddElement {
449            element,
450            timestamp: 12345,
451        });
452
453        let json = queue.to_json().expect("serialization should work");
454        assert!(json.contains("12345"));
455        assert!(json.contains("AddElement"));
456    }
457
458    #[test]
459    fn test_json_deserialization() {
460        let mut queue = OfflineQueue::new();
461        queue.enqueue(Operation::RemoveElement {
462            id: ElementId::new(),
463            timestamp: 999,
464        });
465
466        let json = queue.to_json().expect("serialization should work");
467        let restored = OfflineQueue::from_json(&json).expect("deserialization should work");
468
469        assert_eq!(restored.len(), 1);
470        assert_eq!(restored.pending()[0].timestamp(), 999);
471    }
472
473    #[test]
474    fn test_clear() {
475        let mut queue = OfflineQueue::new();
476        queue.enqueue(Operation::RemoveElement {
477            id: ElementId::new(),
478            timestamp: 1,
479        });
480        queue.enqueue(Operation::RemoveElement {
481            id: ElementId::new(),
482            timestamp: 2,
483        });
484
485        assert_eq!(queue.len(), 2);
486        queue.clear();
487        assert!(queue.is_empty());
488    }
489
490    #[test]
491    fn test_operation_timestamp() {
492        let now = Operation::now();
493        assert!(now > 0);
494
495        let op = Operation::AddElement {
496            element: create_test_element(),
497            timestamp: now,
498        };
499        assert_eq!(op.timestamp(), now);
500    }
501
502    #[test]
503    fn test_update_element_operation() {
504        let op = Operation::UpdateElement {
505            id: ElementId::new(),
506            changes: serde_json::json!({"color": "#ff0000"}),
507            timestamp: 100,
508        };
509
510        assert_eq!(op.timestamp(), 100);
511    }
512
513    #[test]
514    fn test_sync_result_default() {
515        let result = SyncResult::default();
516        assert_eq!(result.synced_count, 0);
517        assert_eq!(result.conflict_count, 0);
518        assert_eq!(result.received_count, 0);
519        assert!(!result.success);
520    }
521
522    #[test]
523    fn test_strategy_getter() {
524        let queue = OfflineQueue::new();
525        assert_eq!(queue.strategy(), ConflictStrategy::LastWriteWins);
526    }
527}