telemetry_kit/
auto_sync.rs1use crate::error::Result;
6use crate::storage::EventStorage;
7use crate::sync::SyncClient;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::sync::Arc;
10use std::time::Duration;
11use tokio::sync::RwLock;
12use tokio::task::JoinHandle;
13
14#[derive(Debug, Clone)]
16pub struct AutoSyncConfig {
17 pub interval: u64,
19 pub sync_on_shutdown: bool,
21 pub batch_size: usize,
23}
24
25impl Default for AutoSyncConfig {
26 fn default() -> Self {
27 Self {
28 interval: 60, sync_on_shutdown: true, batch_size: 100, }
32 }
33}
34
35pub struct AutoSyncTask {
37 handle: Option<JoinHandle<()>>,
38 shutdown: Arc<AtomicBool>,
39 config: AutoSyncConfig,
40}
41
42impl AutoSyncTask {
43 pub fn start(
45 client: Arc<SyncClient>,
46 storage: Arc<RwLock<EventStorage>>,
47 config: AutoSyncConfig,
48 ) -> Self {
49 let shutdown = Arc::new(AtomicBool::new(false));
50 let shutdown_clone = shutdown.clone();
51 let interval = Duration::from_secs(config.interval);
52
53 let handle = tokio::spawn(async move {
54 loop {
55 if shutdown_clone.load(Ordering::SeqCst) {
57 break;
58 }
59
60 if let Err(e) = Self::sync_once(client.clone(), storage.clone()).await {
62 eprintln!("Auto-sync error: {}", e);
64 }
65
66 tokio::time::sleep(interval).await;
68 }
69 });
70
71 Self {
72 handle: Some(handle),
73 shutdown,
74 config,
75 }
76 }
77
78 async fn sync_once(client: Arc<SyncClient>, storage: Arc<RwLock<EventStorage>>) -> Result<()> {
80 use crate::event::EventBatch;
81 use uuid::Uuid;
82
83 let storage_read = storage.read().await;
85 let events = storage_read.get_unsynced(client.config().batch_size)?;
86 drop(storage_read);
87
88 if events.is_empty() {
89 return Ok(());
90 }
91
92 let event_ids: Vec<Uuid> = events.iter().map(|e| e.event_id).collect();
93 let batch = EventBatch::new(events);
94
95 match client.sync(batch).await {
97 Ok(response) => {
98 if response.accepted() > 0 {
99 let storage_write = storage.write().await;
100 storage_write.mark_synced(&event_ids)?;
101 }
102 Ok(())
103 }
104 Err(e) => {
105 let storage_write = storage.write().await;
106 storage_write.increment_retry(&event_ids)?;
107 Err(e)
108 }
109 }
110 }
111
112 pub fn shutdown(&mut self) {
114 self.shutdown.store(true, Ordering::SeqCst);
115 }
116
117 pub async fn join(&mut self) -> Result<()> {
119 if let Some(handle) = self.handle.take() {
120 handle.await.map_err(|e| {
121 crate::error::TelemetryError::Other(format!("Task join error: {}", e))
122 })?;
123 }
124 Ok(())
125 }
126
127 pub fn should_sync_on_shutdown(&self) -> bool {
129 self.config.sync_on_shutdown
130 }
131}
132
133impl Drop for AutoSyncTask {
134 fn drop(&mut self) {
135 self.shutdown();
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use crate::sync::SyncConfig;
144 use std::path::PathBuf;
145 use uuid::Uuid;
146
147 #[tokio::test]
148 async fn test_auto_sync_task_creation() {
149 let unique_id = Uuid::new_v4();
151 let db_path = PathBuf::from(format!("/tmp/telemetry-test-autosync-{}.db", unique_id));
152 let storage = Arc::new(RwLock::new(EventStorage::new(db_path.clone()).unwrap()));
153
154 let config = SyncConfig::builder()
156 .org_id("550e8400-e29b-41d4-a716-446655440000")
157 .unwrap()
158 .app_id("7c9e6679-7425-40de-944b-e07fc1f90ae7")
159 .unwrap()
160 .token("test-token")
161 .secret("test-secret")
162 .build()
163 .unwrap();
164 let client = Arc::new(SyncClient::new(config).unwrap());
165
166 let mut task = AutoSyncTask::start(
168 client,
169 storage,
170 AutoSyncConfig {
171 interval: 1,
172 sync_on_shutdown: true,
173 batch_size: 100,
174 },
175 );
176
177 tokio::time::sleep(Duration::from_millis(100)).await;
179
180 task.shutdown();
182 task.join().await.unwrap();
183
184 let _ = std::fs::remove_file(&db_path);
186 }
187
188 #[tokio::test]
189 async fn test_auto_sync_config_defaults() {
190 let config = AutoSyncConfig::default();
191 assert_eq!(config.interval, 60);
192 assert!(config.sync_on_shutdown);
193 assert_eq!(config.batch_size, 100);
194 }
195}