oxigdal_edge/sync/
manager.rs1use super::protocol::{MockSyncProtocol, SyncProtocol};
4use super::{SyncItem, SyncState, SyncStrategy};
5use crate::cache::Cache;
6use crate::error::{EdgeError, Result};
7use parking_lot::RwLock;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::time::Duration;
11use tokio::task::JoinHandle;
12
13pub struct SyncManager {
15 strategy: SyncStrategy,
16 cache: Arc<Cache>,
17 state: Arc<RwLock<SyncState>>,
18 protocol: Arc<dyn SyncProtocol>,
19 running: Arc<AtomicBool>,
20 handle: Arc<RwLock<Option<JoinHandle<()>>>>,
21}
22
23impl SyncManager {
24 pub fn new(strategy: SyncStrategy, cache: Arc<Cache>) -> Result<Self> {
26 let state = Arc::new(RwLock::new(SyncState::new()));
27 let protocol: Arc<dyn SyncProtocol> = Arc::new(MockSyncProtocol::new());
28
29 Ok(Self {
30 strategy,
31 cache,
32 state,
33 protocol,
34 running: Arc::new(AtomicBool::new(false)),
35 handle: Arc::new(RwLock::new(None)),
36 })
37 }
38
39 pub async fn start(&self) -> Result<()> {
41 if self.running.load(Ordering::Relaxed) {
42 return Err(EdgeError::sync("Sync manager already running"));
43 }
44
45 self.running.store(true, Ordering::Relaxed);
46
47 match self.strategy {
48 SyncStrategy::Manual => {
49 Ok(())
51 }
52 SyncStrategy::Periodic => self.start_periodic_sync().await,
53 SyncStrategy::Incremental => self.start_incremental_sync().await,
54 SyncStrategy::Batch => self.start_batch_sync().await,
55 SyncStrategy::Realtime => self.start_realtime_sync().await,
56 }
57 }
58
59 pub async fn stop(&self) -> Result<()> {
61 if !self.running.load(Ordering::Relaxed) {
62 return Ok(());
63 }
64
65 self.running.store(false, Ordering::Relaxed);
66
67 let handle = {
68 let mut handle_lock = self.handle.write();
69 handle_lock.take()
70 };
71
72 if let Some(handle) = handle {
73 let timeout_duration = Duration::from_secs(5);
74 match tokio::time::timeout(timeout_duration, handle).await {
75 Ok(_) => {}
76 Err(_) => {
77 tracing::warn!("Sync manager stop timed out after {:?}", timeout_duration);
78 }
79 }
80 }
81
82 Ok(())
83 }
84
85 async fn start_periodic_sync(&self) -> Result<()> {
87 let protocol = Arc::clone(&self.protocol);
88 let state = Arc::clone(&self.state);
89 let running = Arc::clone(&self.running);
90
91 let handle = tokio::spawn(async move {
92 while running.load(Ordering::Relaxed) {
93 if protocol.is_connected().await {
94 let _ = Self::perform_sync(&protocol, &state).await;
95 }
96
97 tokio::time::sleep(Duration::from_millis(100)).await; }
99 });
100
101 let mut handle_lock = self.handle.write();
102 *handle_lock = Some(handle);
103
104 Ok(())
105 }
106
107 async fn start_incremental_sync(&self) -> Result<()> {
109 let protocol = Arc::clone(&self.protocol);
110 let state = Arc::clone(&self.state);
111 let running = Arc::clone(&self.running);
112
113 let handle = tokio::spawn(async move {
114 while running.load(Ordering::Relaxed) {
115 if protocol.is_connected().await {
116 let has_pending = {
117 let state_read = state.read();
118 !state_read.pending_items.is_empty()
119 };
120 if has_pending {
121 let _ = Self::perform_sync(&protocol, &state).await;
122 }
123 }
124
125 tokio::time::sleep(Duration::from_millis(100)).await; }
127 });
128
129 let mut handle_lock = self.handle.write();
130 *handle_lock = Some(handle);
131
132 Ok(())
133 }
134
135 async fn start_batch_sync(&self) -> Result<()> {
137 let protocol = Arc::clone(&self.protocol);
138 let state = Arc::clone(&self.state);
139 let running = Arc::clone(&self.running);
140
141 let handle = tokio::spawn(async move {
142 while running.load(Ordering::Relaxed) {
143 if protocol.is_connected().await {
144 let should_sync = {
145 let state_read = state.read();
146 state_read.pending_items.len() >= 10
147 };
148 if should_sync {
149 let _ = Self::perform_sync(&protocol, &state).await;
151 }
152 }
153
154 tokio::time::sleep(Duration::from_millis(100)).await; }
156 });
157
158 let mut handle_lock = self.handle.write();
159 *handle_lock = Some(handle);
160
161 Ok(())
162 }
163
164 async fn start_realtime_sync(&self) -> Result<()> {
166 let protocol = Arc::clone(&self.protocol);
167 let state = Arc::clone(&self.state);
168 let running = Arc::clone(&self.running);
169
170 let handle = tokio::spawn(async move {
171 while running.load(Ordering::Relaxed) {
172 if protocol.is_connected().await {
173 let has_pending = {
174 let state_read = state.read();
175 !state_read.pending_items.is_empty()
176 };
177 if has_pending {
178 let _ = Self::perform_sync(&protocol, &state).await;
179 }
180 }
181
182 tokio::time::sleep(Duration::from_millis(100)).await; }
184 });
185
186 let mut handle_lock = self.handle.write();
187 *handle_lock = Some(handle);
188
189 Ok(())
190 }
191
192 async fn perform_sync(
194 protocol: &Arc<dyn SyncProtocol>,
195 state: &Arc<RwLock<SyncState>>,
196 ) -> Result<()> {
197 let items: Vec<SyncItem> = {
198 let state_read = state.read();
199 state_read.pending_items.values().cloned().collect()
200 };
201
202 if items.is_empty() {
203 return Ok(());
204 }
205
206 match protocol.sync(items).await {
207 Ok(result) => {
208 let mut state_write = state.write();
209
210 for item_id in &result.pushed {
212 state_write.remove_pending(item_id);
213 }
214
215 state_write.complete_sync();
216 Ok(())
217 }
218 Err(e) => {
219 let mut state_write = state.write();
220 state_write.fail_sync(e.to_string());
221 Err(e)
222 }
223 }
224 }
225
226 pub async fn sync_now(&self) -> Result<()> {
228 Self::perform_sync(&self.protocol, &self.state).await
229 }
230
231 pub fn add_pending(&self, item: SyncItem) {
233 let mut state = self.state.write();
234 state.add_pending(item);
235 }
236
237 pub fn state(&self) -> SyncState {
239 self.state.read().clone()
240 }
241
242 pub fn statistics(&self) -> super::SyncStatistics {
244 self.state.read().statistics()
245 }
246
247 pub fn cache(&self) -> &Arc<Cache> {
249 &self.cache
250 }
251
252 pub fn is_running(&self) -> bool {
254 self.running.load(Ordering::Relaxed)
255 }
256
257 pub fn set_protocol(&mut self, protocol: Arc<dyn SyncProtocol>) {
259 self.protocol = protocol;
260 }
261}
262
263impl Drop for SyncManager {
264 fn drop(&mut self) {
265 self.running.store(false, Ordering::Relaxed);
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use crate::cache::CacheConfig;
273
274 #[tokio::test]
275 async fn test_sync_manager_creation() -> Result<()> {
276 let cache_config = CacheConfig::minimal();
277 let cache = Arc::new(Cache::new(cache_config)?);
278 let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
279
280 assert!(!manager.is_running());
281 Ok(())
282 }
283
284 #[tokio::test]
285 async fn test_sync_manager_lifecycle() -> Result<()> {
286 let cache_config = CacheConfig::minimal();
287 let cache = Arc::new(Cache::new(cache_config)?);
288 let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
289
290 manager.start().await?;
291 assert!(manager.is_running());
292
293 manager.stop().await?;
294 assert!(!manager.is_running());
295
296 Ok(())
297 }
298
299 #[tokio::test]
300 async fn test_sync_manager_add_pending() -> Result<()> {
301 let cache_config = CacheConfig::minimal();
302 let cache = Arc::new(Cache::new(cache_config)?);
303 let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
304
305 let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
306
307 manager.add_pending(item);
308
309 let state = manager.state();
310 assert_eq!(state.pending_count(), 1);
311
312 Ok(())
313 }
314
315 #[tokio::test]
316 async fn test_sync_manager_manual_sync() -> Result<()> {
317 let cache_config = CacheConfig::minimal();
318 let cache = Arc::new(Cache::new(cache_config)?);
319 let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
320
321 let item = SyncItem::new("item-1".to_string(), "key-1".to_string(), vec![1, 2, 3], 1);
322
323 manager.add_pending(item);
324 manager.sync_now().await?;
325
326 let state = manager.state();
327 assert_eq!(state.pending_count(), 0);
328
329 Ok(())
330 }
331
332 #[tokio::test]
333 async fn test_sync_manager_statistics() -> Result<()> {
334 let cache_config = CacheConfig::minimal();
335 let cache = Arc::new(Cache::new(cache_config)?);
336 let manager = SyncManager::new(SyncStrategy::Manual, cache)?;
337
338 let stats = manager.statistics();
339 assert_eq!(stats.total_syncs, 0);
340 assert_eq!(stats.pending_items, 0);
341
342 Ok(())
343 }
344}