Skip to main content

oxigdal_edge/sync/
mod.rs

1//! Synchronization protocols for edge-to-cloud data sync
2
3pub mod manager;
4pub mod protocol;
5
6// Error types imported per-module as needed
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10
11pub use manager::SyncManager;
12pub use protocol::SyncProtocol;
13
14/// Synchronization strategy
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum SyncStrategy {
17    /// Manual sync only
18    Manual,
19    /// Periodic sync at fixed intervals
20    Periodic,
21    /// Incremental sync of changes only
22    Incremental,
23    /// Batch sync with compression
24    Batch,
25    /// Real-time sync (when connected)
26    Realtime,
27}
28
29/// Synchronization status
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub enum SyncStatus {
32    /// Not synced yet
33    NotSynced,
34    /// Sync in progress
35    Syncing,
36    /// Successfully synced
37    Synced,
38    /// Sync failed
39    Failed(String),
40    /// Sync pending
41    Pending,
42}
43
44impl SyncStatus {
45    /// Check if sync is complete
46    pub fn is_complete(&self) -> bool {
47        matches!(self, Self::Synced)
48    }
49
50    /// Check if sync is in progress
51    pub fn is_syncing(&self) -> bool {
52        matches!(self, Self::Syncing)
53    }
54
55    /// Check if sync failed
56    pub fn is_failed(&self) -> bool {
57        matches!(self, Self::Failed(_))
58    }
59}
60
61/// Sync metadata
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct SyncMetadata {
64    /// Unique sync ID
65    pub sync_id: String,
66    /// Sync strategy used
67    pub strategy: SyncStrategy,
68    /// Sync status
69    pub status: SyncStatus,
70    /// Start timestamp
71    pub started_at: DateTime<Utc>,
72    /// Completion timestamp
73    pub completed_at: Option<DateTime<Utc>>,
74    /// Number of items synced
75    pub items_synced: usize,
76    /// Total bytes transferred
77    pub bytes_transferred: usize,
78    /// Error message if failed
79    pub error: Option<String>,
80}
81
82impl SyncMetadata {
83    /// Create new sync metadata
84    pub fn new(sync_id: String, strategy: SyncStrategy) -> Self {
85        Self {
86            sync_id,
87            strategy,
88            status: SyncStatus::Pending,
89            started_at: Utc::now(),
90            completed_at: None,
91            items_synced: 0,
92            bytes_transferred: 0,
93            error: None,
94        }
95    }
96
97    /// Mark sync as started
98    pub fn start(&mut self) {
99        self.status = SyncStatus::Syncing;
100        self.started_at = Utc::now();
101    }
102
103    /// Mark sync as completed
104    pub fn complete(&mut self, items: usize, bytes: usize) {
105        self.status = SyncStatus::Synced;
106        self.completed_at = Some(Utc::now());
107        self.items_synced = items;
108        self.bytes_transferred = bytes;
109    }
110
111    /// Mark sync as failed
112    pub fn fail(&mut self, error: String) {
113        self.status = SyncStatus::Failed(error.clone());
114        self.completed_at = Some(Utc::now());
115        self.error = Some(error);
116    }
117
118    /// Get sync duration
119    pub fn duration(&self) -> Option<chrono::Duration> {
120        self.completed_at.map(|end| end - self.started_at)
121    }
122
123    /// Get throughput in bytes per second
124    pub fn throughput(&self) -> Option<f64> {
125        self.duration().map(|d| {
126            let secs = d.num_milliseconds() as f64 / 1000.0;
127            if secs > 0.0 {
128                self.bytes_transferred as f64 / secs
129            } else {
130                0.0
131            }
132        })
133    }
134}
135
136/// Sync item representing a piece of data to sync
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct SyncItem {
139    /// Item ID
140    pub id: String,
141    /// Item key
142    pub key: String,
143    /// Item data
144    pub data: Vec<u8>,
145    /// Item version
146    pub version: u64,
147    /// Last modified timestamp
148    pub modified_at: DateTime<Utc>,
149    /// Checksum for validation
150    pub checksum: String,
151}
152
153impl SyncItem {
154    /// Create new sync item
155    pub fn new(id: String, key: String, data: Vec<u8>, version: u64) -> Self {
156        let checksum = Self::calculate_checksum(&data);
157        Self {
158            id,
159            key,
160            data,
161            version,
162            modified_at: Utc::now(),
163            checksum,
164        }
165    }
166
167    /// Calculate checksum using blake3
168    fn calculate_checksum(data: &[u8]) -> String {
169        let hash = blake3::hash(data);
170        hash.to_hex().to_string()
171    }
172
173    /// Verify checksum
174    pub fn verify_checksum(&self) -> bool {
175        Self::calculate_checksum(&self.data) == self.checksum
176    }
177
178    /// Get data size
179    pub fn size(&self) -> usize {
180        self.data.len()
181    }
182}
183
184/// Sync batch for batch synchronization
185#[derive(Debug, Clone, Serialize, Deserialize)]
186pub struct SyncBatch {
187    /// Batch ID
188    pub batch_id: String,
189    /// Items in batch
190    pub items: Vec<SyncItem>,
191    /// Batch creation time
192    pub created_at: DateTime<Utc>,
193    /// Compression applied
194    pub compressed: bool,
195}
196
197impl SyncBatch {
198    /// Create new sync batch
199    pub fn new(batch_id: String) -> Self {
200        Self {
201            batch_id,
202            items: Vec::new(),
203            created_at: Utc::now(),
204            compressed: false,
205        }
206    }
207
208    /// Add item to batch
209    pub fn add_item(&mut self, item: SyncItem) {
210        self.items.push(item);
211    }
212
213    /// Get batch size in bytes
214    pub fn size(&self) -> usize {
215        self.items.iter().map(|item| item.size()).sum()
216    }
217
218    /// Get number of items
219    pub fn len(&self) -> usize {
220        self.items.len()
221    }
222
223    /// Check if batch is empty
224    pub fn is_empty(&self) -> bool {
225        self.items.is_empty()
226    }
227}
228
229/// Sync state tracker
230#[derive(Debug, Clone, Serialize, Deserialize)]
231pub struct SyncState {
232    /// Last sync timestamp
233    pub last_sync: Option<DateTime<Utc>>,
234    /// Items pending sync
235    pub pending_items: HashMap<String, SyncItem>,
236    /// Current sync metadata
237    pub current_sync: Option<SyncMetadata>,
238    /// Sync history
239    pub history: Vec<SyncMetadata>,
240}
241
242impl Default for SyncState {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248impl SyncState {
249    /// Create new sync state
250    pub fn new() -> Self {
251        Self {
252            last_sync: None,
253            pending_items: HashMap::new(),
254            current_sync: None,
255            history: Vec::new(),
256        }
257    }
258
259    /// Add pending item
260    pub fn add_pending(&mut self, item: SyncItem) {
261        self.pending_items.insert(item.id.clone(), item);
262    }
263
264    /// Remove pending item
265    pub fn remove_pending(&mut self, item_id: &str) -> Option<SyncItem> {
266        self.pending_items.remove(item_id)
267    }
268
269    /// Get pending items count
270    pub fn pending_count(&self) -> usize {
271        self.pending_items.len()
272    }
273
274    /// Start new sync
275    pub fn start_sync(&mut self, metadata: SyncMetadata) {
276        self.current_sync = Some(metadata);
277    }
278
279    /// Complete current sync
280    pub fn complete_sync(&mut self) {
281        if let Some(mut sync) = self.current_sync.take() {
282            sync.complete(0, 0);
283            self.last_sync = Some(Utc::now());
284            self.history.push(sync);
285
286            // Keep only last 100 syncs in history
287            if self.history.len() > 100 {
288                self.history.remove(0);
289            }
290        }
291    }
292
293    /// Fail current sync
294    pub fn fail_sync(&mut self, error: String) {
295        if let Some(mut sync) = self.current_sync.take() {
296            sync.fail(error);
297            self.history.push(sync);
298
299            // Keep only last 100 syncs in history
300            if self.history.len() > 100 {
301                self.history.remove(0);
302            }
303        }
304    }
305
306    /// Get sync statistics
307    pub fn statistics(&self) -> SyncStatistics {
308        let total_syncs = self.history.len();
309        let successful = self
310            .history
311            .iter()
312            .filter(|s| s.status.is_complete())
313            .count();
314        let failed = self.history.iter().filter(|s| s.status.is_failed()).count();
315
316        let avg_throughput = if successful > 0 {
317            let sum: f64 = self.history.iter().filter_map(|s| s.throughput()).sum();
318            sum / successful as f64
319        } else {
320            0.0
321        };
322
323        SyncStatistics {
324            total_syncs,
325            successful_syncs: successful,
326            failed_syncs: failed,
327            pending_items: self.pending_count(),
328            last_sync: self.last_sync,
329            avg_throughput_bps: avg_throughput,
330        }
331    }
332}
333
334/// Sync statistics
335#[derive(Debug, Clone, Serialize, Deserialize)]
336pub struct SyncStatistics {
337    /// Total number of syncs
338    pub total_syncs: usize,
339    /// Successful syncs
340    pub successful_syncs: usize,
341    /// Failed syncs
342    pub failed_syncs: usize,
343    /// Pending items
344    pub pending_items: usize,
345    /// Last sync timestamp
346    pub last_sync: Option<DateTime<Utc>>,
347    /// Average throughput in bytes per second
348    pub avg_throughput_bps: f64,
349}
350
351impl SyncStatistics {
352    /// Get success rate
353    pub fn success_rate(&self) -> f64 {
354        if self.total_syncs == 0 {
355            return 100.0;
356        }
357        (self.successful_syncs as f64 / self.total_syncs as f64) * 100.0
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use super::*;
364
365    #[test]
366    fn test_sync_metadata() {
367        let mut metadata = SyncMetadata::new("sync-1".to_string(), SyncStrategy::Incremental);
368        assert_eq!(metadata.status, SyncStatus::Pending);
369
370        metadata.start();
371        assert_eq!(metadata.status, SyncStatus::Syncing);
372
373        metadata.complete(10, 1024);
374        assert_eq!(metadata.status, SyncStatus::Synced);
375        assert_eq!(metadata.items_synced, 10);
376        assert_eq!(metadata.bytes_transferred, 1024);
377    }
378
379    #[test]
380    fn test_sync_item() {
381        let item = SyncItem::new(
382            "item-1".to_string(),
383            "key-1".to_string(),
384            vec![1, 2, 3, 4, 5],
385            1,
386        );
387
388        assert_eq!(item.size(), 5);
389        assert!(item.verify_checksum());
390    }
391
392    #[test]
393    fn test_sync_batch() {
394        let mut batch = SyncBatch::new("batch-1".to_string());
395        assert!(batch.is_empty());
396
397        let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
398        batch.add_item(item);
399
400        assert_eq!(batch.len(), 1);
401        assert_eq!(batch.size(), 3);
402    }
403
404    #[test]
405    fn test_sync_state() {
406        let mut state = SyncState::new();
407        assert_eq!(state.pending_count(), 0);
408
409        let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
410        state.add_pending(item);
411
412        assert_eq!(state.pending_count(), 1);
413
414        let removed = state.remove_pending("item-1");
415        assert!(removed.is_some());
416        assert_eq!(state.pending_count(), 0);
417    }
418
419    #[test]
420    fn test_sync_statistics() {
421        let mut state = SyncState::new();
422
423        for i in 0..5 {
424            let mut metadata = SyncMetadata::new(format!("sync-{}", i), SyncStrategy::Incremental);
425            metadata.start();
426            metadata.complete(10, 1024);
427            state.history.push(metadata);
428        }
429
430        let stats = state.statistics();
431        assert_eq!(stats.total_syncs, 5);
432        assert_eq!(stats.successful_syncs, 5);
433        assert_eq!(stats.success_rate(), 100.0);
434    }
435}