telemetry_kit/
auto_sync.rs

1//! Auto-sync background task for telemetry-kit
2//!
3//! Automatically syncs buffered events to the server in the background.
4
5use crate::error::Result;
6use crate::storage::EventStorage;
7use crate::sync::SyncClient;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13
14/// Configuration for auto-sync behavior
15#[derive(Debug, Clone)]
16pub struct AutoSyncConfig {
17    /// Interval between sync attempts (seconds)
18    pub interval: u64,
19    /// Whether to sync on shutdown
20    pub sync_on_shutdown: bool,
21    /// Maximum batch size per sync
22    pub batch_size: usize,
23}
24
25impl Default for AutoSyncConfig {
26    fn default() -> Self {
27        Self {
28            interval: 60,           // 60 seconds
29            sync_on_shutdown: true, // Sync before dropping
30            batch_size: 100,        // Match sync client default
31        }
32    }
33}
34
35/// Background task that automatically syncs events
36pub struct AutoSyncTask {
37    handle: Option<JoinHandle<()>>,
38    shutdown: Arc<AtomicBool>,
39    config: AutoSyncConfig,
40}
41
42impl AutoSyncTask {
43    /// Start a new auto-sync background task
44    pub fn start(
45        client: Arc<SyncClient>,
46        storage: Arc<RwLock<EventStorage>>,
47        config: AutoSyncConfig,
48    ) -> Self {
49        let shutdown = Arc::new(AtomicBool::new(false));
50        let shutdown_clone = shutdown.clone();
51        let interval = Duration::from_secs(config.interval);
52
53        let handle = tokio::spawn(async move {
54            loop {
55                // Check if shutdown requested
56                if shutdown_clone.load(Ordering::SeqCst) {
57                    break;
58                }
59
60                // Perform sync
61                if let Err(e) = Self::sync_once(client.clone(), storage.clone()).await {
62                    // Log error but don't crash - sync will retry on next interval
63                    eprintln!("Auto-sync error: {}", e);
64                }
65
66                // Wait for next interval
67                tokio::time::sleep(interval).await;
68            }
69        });
70
71        Self {
72            handle: Some(handle),
73            shutdown,
74            config,
75        }
76    }
77
78    /// Perform a single sync operation
79    async fn sync_once(client: Arc<SyncClient>, storage: Arc<RwLock<EventStorage>>) -> Result<()> {
80        use crate::event::EventBatch;
81        use uuid::Uuid;
82
83        // Get unsynced events
84        let storage_read = storage.read().await;
85        let events = storage_read.get_unsynced(client.config().batch_size)?;
86        drop(storage_read);
87
88        if events.is_empty() {
89            return Ok(());
90        }
91
92        let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect();
93        let batch = EventBatch::new(events);
94
95        // Attempt sync
96        match client.sync(batch).await {
97            Ok(response) => {
98                if response.accepted() > 0 {
99                    let storage_write = storage.write().await;
100                    storage_write.mark_synced(&event_ids)?;
101                }
102                Ok(())
103            }
104            Err(e) => {
105                let storage_write = storage.write().await;
106                storage_write.increment_retry(&event_ids)?;
107                Err(e)
108            }
109        }
110    }
111
112    /// Request graceful shutdown of the background task
113    pub fn shutdown(&mut self) {
114        self.shutdown.store(true, Ordering::SeqCst);
115    }
116
117    /// Wait for the background task to complete
118    pub async fn join(&mut self) -> Result<()> {
119        if let Some(handle) = self.handle.take() {
120            handle.await.map_err(|e| {
121                crate::error::TelemetryError::Other(format!("Task join error: {}", e))
122            })?;
123        }
124        Ok(())
125    }
126
127    /// Check if auto-sync should sync on shutdown
128    pub fn should_sync_on_shutdown(&self) -> bool {
129        self.config.sync_on_shutdown
130    }
131}
132
133impl Drop for AutoSyncTask {
134    fn drop(&mut self) {
135        // Request shutdown when dropped
136        self.shutdown();
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use crate::sync::SyncConfig;
144    use std::path::PathBuf;
145    use uuid::Uuid;
146
147    #[tokio::test]
148    async fn test_auto_sync_task_creation() {
149        // Create test storage
150        let unique_id = Uuid::new_v4();
151        let db_path = PathBuf::from(format!("/tmp/telemetry-test-autosync-{}.db", unique_id));
152        let storage = Arc::new(RwLock::new(EventStorage::new(db_path.clone()).unwrap()));
153
154        // Create test sync client (will fail to connect, but that's ok for this test)
155        let config = SyncConfig::builder()
156            .org_id("550e8400-e29b-41d4-a716-446655440000")
157            .unwrap()
158            .app_id("7c9e6679-7425-40de-944b-e07fc1f90ae7")
159            .unwrap()
160            .token("test-token")
161            .secret("test-secret")
162            .build()
163            .unwrap();
164        let client = Arc::new(SyncClient::new(config).unwrap());
165
166        // Create auto-sync task
167        let mut task = AutoSyncTask::start(
168            client,
169            storage,
170            AutoSyncConfig {
171                interval: 1,
172                sync_on_shutdown: true,
173                batch_size: 100,
174            },
175        );
176
177        // Wait a bit
178        tokio::time::sleep(Duration::from_millis(100)).await;
179
180        // Shutdown
181        task.shutdown();
182        task.join().await.unwrap();
183
184        // Cleanup
185        let _ = std::fs::remove_file(&db_path);
186    }
187
188    #[tokio::test]
189    async fn test_auto_sync_config_defaults() {
190        let config = AutoSyncConfig::default();
191        assert_eq!(config.interval, 60);
192        assert!(config.sync_on_shutdown);
193        assert_eq!(config.batch_size, 100);
194    }
195}