Skip to main content

oxigdal_edge/sync/
manager.rs

1//! Sync manager for coordinating synchronization
2
3use super::protocol::{MockSyncProtocol, SyncProtocol};
4use super::{SyncItem, SyncState, SyncStrategy};
5use crate::cache::Cache;
6use crate::error::{EdgeError, Result};
7use parking_lot::RwLock;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11use tokio::task::JoinHandle;
12
13/// Sync manager
14pub struct SyncManager {
15    strategy: SyncStrategy,
16    cache: Arc<Cache>,
17    state: Arc<RwLock<SyncState>>,
18    protocol: Arc<dyn SyncProtocol>,
19    running: Arc<AtomicBool>,
20    handle: Arc<RwLock<Option<JoinHandle<()>>>>,
21}
22
23impl SyncManager {
24    /// Create new sync manager
25    pub fn new(strategy: SyncStrategy, cache: Arc<Cache>) -> Result<Self> {
26        let state = Arc::new(RwLock::new(SyncState::new()));
27        let protocol: Arc<dyn SyncProtocol> = Arc::new(MockSyncProtocol::new());
28
29        Ok(Self {
30            strategy,
31            cache,
32            state,
33            protocol,
34            running: Arc::new(AtomicBool::new(false)),
35            handle: Arc::new(RwLock::new(None)),
36        })
37    }
38
39    /// Start sync manager
40    pub async fn start(&self) -> Result<()> {
41        if self.running.load(Ordering::Relaxed) {
42            return Err(EdgeError::sync("Sync manager already running"));
43        }
44
45        self.running.store(true, Ordering::Relaxed);
46
47        match self.strategy {
48            SyncStrategy::Manual => {
49                // No automatic sync
50                Ok(())
51            }
52            SyncStrategy::Periodic => self.start_periodic_sync().await,
53            SyncStrategy::Incremental => self.start_incremental_sync().await,
54            SyncStrategy::Batch => self.start_batch_sync().await,
55            SyncStrategy::Realtime => self.start_realtime_sync().await,
56        }
57    }
58
59    /// Stop sync manager
60    pub async fn stop(&self) -> Result<()> {
61        if !self.running.load(Ordering::Relaxed) {
62            return Ok(());
63        }
64
65        self.running.store(false, Ordering::Relaxed);
66
67        let handle = {
68            let mut handle_lock = self.handle.write();
69            handle_lock.take()
70        };
71
72        if let Some(handle) = handle {
73            let timeout_duration = Duration::from_secs(5);
74            match tokio::time::timeout(timeout_duration, handle).await {
75                Ok(_) => {}
76                Err(_) => {
77                    tracing::warn!("Sync manager stop timed out after {:?}", timeout_duration);
78                }
79            }
80        }
81
82        Ok(())
83    }
84
85    /// Start periodic sync
86    async fn start_periodic_sync(&self) -> Result<()> {
87        let protocol = Arc::clone(&self.protocol);
88        let state = Arc::clone(&self.state);
89        let running = Arc::clone(&self.running);
90
91        let handle = tokio::spawn(async move {
92            while running.load(Ordering::Relaxed) {
93                if protocol.is_connected().await {
94                    let _ = Self::perform_sync(&protocol, &state).await;
95                }
96
97                tokio::time::sleep(Duration::from_millis(100)).await; // 100ms for tests
98            }
99        });
100
101        let mut handle_lock = self.handle.write();
102        *handle_lock = Some(handle);
103
104        Ok(())
105    }
106
107    /// Start incremental sync
108    async fn start_incremental_sync(&self) -> Result<()> {
109        let protocol = Arc::clone(&self.protocol);
110        let state = Arc::clone(&self.state);
111        let running = Arc::clone(&self.running);
112
113        let handle = tokio::spawn(async move {
114            while running.load(Ordering::Relaxed) {
115                if protocol.is_connected().await {
116                    let has_pending = {
117                        let state_read = state.read();
118                        !state_read.pending_items.is_empty()
119                    };
120                    if has_pending {
121                        let _ = Self::perform_sync(&protocol, &state).await;
122                    }
123                }
124
125                tokio::time::sleep(Duration::from_millis(100)).await; // 100ms for tests
126            }
127        });
128
129        let mut handle_lock = self.handle.write();
130        *handle_lock = Some(handle);
131
132        Ok(())
133    }
134
135    /// Start batch sync
136    async fn start_batch_sync(&self) -> Result<()> {
137        let protocol = Arc::clone(&self.protocol);
138        let state = Arc::clone(&self.state);
139        let running = Arc::clone(&self.running);
140
141        let handle = tokio::spawn(async move {
142            while running.load(Ordering::Relaxed) {
143                if protocol.is_connected().await {
144                    let should_sync = {
145                        let state_read = state.read();
146                        state_read.pending_items.len() >= 10
147                    };
148                    if should_sync {
149                        // Batch size threshold
150                        let _ = Self::perform_sync(&protocol, &state).await;
151                    }
152                }
153
154                tokio::time::sleep(Duration::from_millis(100)).await; // 100ms for tests
155            }
156        });
157
158        let mut handle_lock = self.handle.write();
159        *handle_lock = Some(handle);
160
161        Ok(())
162    }
163
164    /// Start real-time sync
165    async fn start_realtime_sync(&self) -> Result<()> {
166        let protocol = Arc::clone(&self.protocol);
167        let state = Arc::clone(&self.state);
168        let running = Arc::clone(&self.running);
169
170        let handle = tokio::spawn(async move {
171            while running.load(Ordering::Relaxed) {
172                if protocol.is_connected().await {
173                    let has_pending = {
174                        let state_read = state.read();
175                        !state_read.pending_items.is_empty()
176                    };
177                    if has_pending {
178                        let _ = Self::perform_sync(&protocol, &state).await;
179                    }
180                }
181
182                tokio::time::sleep(Duration::from_millis(100)).await; // 100ms for tests
183            }
184        });
185
186        let mut handle_lock = self.handle.write();
187        *handle_lock = Some(handle);
188
189        Ok(())
190    }
191
192    /// Perform synchronization
193    async fn perform_sync(
194        protocol: &Arc<dyn SyncProtocol>,
195        state: &Arc<RwLock<SyncState>>,
196    ) -> Result<()> {
197        let items: Vec<SyncItem> = {
198            let state_read = state.read();
199            state_read.pending_items.values().cloned().collect()
200        };
201
202        if items.is_empty() {
203            return Ok(());
204        }
205
206        match protocol.sync(items).await {
207            Ok(result) => {
208                let mut state_write = state.write();
209
210                // Remove synced items from pending
211                for item_id in &result.pushed {
212                    state_write.remove_pending(item_id);
213                }
214
215                state_write.complete_sync();
216                Ok(())
217            }
218            Err(e) => {
219                let mut state_write = state.write();
220                state_write.fail_sync(e.to_string());
221                Err(e)
222            }
223        }
224    }
225
226    /// Manually trigger sync
227    pub async fn sync_now(&self) -> Result<()> {
228        Self::perform_sync(&self.protocol, &self.state).await
229    }
230
231    /// Add item to pending sync queue
232    pub fn add_pending(&self, item: SyncItem) {
233        let mut state = self.state.write();
234        state.add_pending(item);
235    }
236
237    /// Get sync state
238    pub fn state(&self) -> SyncState {
239        self.state.read().clone()
240    }
241
242    /// Get sync statistics
243    pub fn statistics(&self) -> super::SyncStatistics {
244        self.state.read().statistics()
245    }
246
247    /// Get reference to the cache
248    pub fn cache(&self) -> &Arc<Cache> {
249        &self.cache
250    }
251
252    /// Check if sync is running
253    pub fn is_running(&self) -> bool {
254        self.running.load(Ordering::Relaxed)
255    }
256
257    /// Set sync protocol (for testing)
258    pub fn set_protocol(&mut self, protocol: Arc<dyn SyncProtocol>) {
259        self.protocol = protocol;
260    }
261}
262
263impl Drop for SyncManager {
264    fn drop(&mut self) {
265        self.running.store(false, Ordering::Relaxed);
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use crate::cache::CacheConfig;
273
274    #[tokio::test]
275    async fn test_sync_manager_creation() -> Result<()> {
276        let cache_config = CacheConfig::minimal();
277        let cache = Arc::new(Cache::new(cache_config)?);
278        let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
279
280        assert!(!manager.is_running());
281        Ok(())
282    }
283
284    #[tokio::test]
285    async fn test_sync_manager_lifecycle() -> Result<()> {
286        let cache_config = CacheConfig::minimal();
287        let cache = Arc::new(Cache::new(cache_config)?);
288        let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
289
290        manager.start().await?;
291        assert!(manager.is_running());
292
293        manager.stop().await?;
294        assert!(!manager.is_running());
295
296        Ok(())
297    }
298
299    #[tokio::test]
300    async fn test_sync_manager_add_pending() -> Result<()> {
301        let cache_config = CacheConfig::minimal();
302        let cache = Arc::new(Cache::new(cache_config)?);
303        let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
304
305        let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
306
307        manager.add_pending(item);
308
309        let state = manager.state();
310        assert_eq!(state.pending_count(), 1);
311
312        Ok(())
313    }
314
315    #[tokio::test]
316    async fn test_sync_manager_manual_sync() -> Result<()> {
317        let cache_config = CacheConfig::minimal();
318        let cache = Arc::new(Cache::new(cache_config)?);
319        let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
320
321        let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
322
323        manager.add_pending(item);
324        manager.sync_now().await?;
325
326        let state = manager.state();
327        assert_eq!(state.pending_count(), 0);
328
329        Ok(())
330    }
331
332    #[tokio::test]
333    async fn test_sync_manager_statistics() -> Result<()> {
334        let cache_config = CacheConfig::minimal();
335        let cache = Arc::new(Cache::new(cache_config)?);
336        let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
337
338        let stats = manager.statistics();
339        assert_eq!(stats.total_syncs, 0);
340        assert_eq!(stats.pending_items, 0);
341
342        Ok(())
343    }
344}