Skip to main content

garmin_cli/sync/
task_queue.rs

1//! Task queue for sync operations with crash recovery
2//!
3//! Uses SQLite for task persistence, enabling concurrent read access to Parquet data files.
4
5use std::sync::Arc;
6
7use chrono::Duration;
8use tokio::sync::Mutex;
9
10#[cfg(test)]
11use crate::db::models::SyncTaskType;
12use crate::db::models::{SyncPipeline, SyncTask};
13use crate::storage::SyncDb;
14use crate::Result;
15
16/// Task queue backed by SQLite for persistence
17pub struct TaskQueue {
18    sync_db: SyncDb,
19    profile_id: i32,
20    pipeline: Option<SyncPipeline>,
21    rr_index: usize,
22}
23
24impl TaskQueue {
25    /// Create a new task queue
26    pub fn new(sync_db: SyncDb, profile_id: i32, pipeline: Option<SyncPipeline>) -> Self {
27        Self {
28            sync_db,
29            profile_id,
30            pipeline,
31            rr_index: 0,
32        }
33    }
34
35    /// Add a task to the queue
36    pub fn push(&self, task: SyncTask) -> Result<i64> {
37        self.sync_db.push_task(&task)
38    }
39
40    /// Get the next pending task
41    pub fn pop(&self) -> Result<Option<SyncTask>> {
42        self.sync_db.pop_task(self.profile_id, self.pipeline)
43    }
44
45    /// Get the next pending task for a specific pipeline
46    pub fn pop_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<Option<SyncTask>> {
47        self.sync_db.pop_task(self.profile_id, pipeline)
48    }
49
50    /// Pop the next task using round-robin across primary task types
51    pub fn pop_round_robin(&mut self) -> Result<Option<SyncTask>> {
52        const TASK_TYPES: [&str; 4] = ["activities", "download_gpx", "performance", "daily_health"];
53
54        for _ in 0..TASK_TYPES.len() {
55            let idx = self.rr_index % TASK_TYPES.len();
56            self.rr_index = self.rr_index.wrapping_add(1);
57            if let Some(task) =
58                self.sync_db
59                    .pop_task_by_type(self.profile_id, TASK_TYPES[idx], self.pipeline)?
60            {
61                return Ok(Some(task));
62            }
63        }
64
65        // Fallback for other task types
66        self.sync_db.pop_task(self.profile_id, self.pipeline)
67    }
68
69    /// Pop the next task using round-robin across primary task types for a pipeline
70    pub fn pop_round_robin_with_pipeline(
71        &mut self,
72        pipeline: Option<SyncPipeline>,
73    ) -> Result<Option<SyncTask>> {
74        const TASK_TYPES: [&str; 4] = ["activities", "download_gpx", "performance", "daily_health"];
75
76        for _ in 0..TASK_TYPES.len() {
77            let idx = self.rr_index % TASK_TYPES.len();
78            self.rr_index = self.rr_index.wrapping_add(1);
79            if let Some(task) =
80                self.sync_db
81                    .pop_task_by_type(self.profile_id, TASK_TYPES[idx], pipeline)?
82            {
83                return Ok(Some(task));
84            }
85        }
86
87        self.sync_db.pop_task(self.profile_id, pipeline)
88    }
89
90    /// Mark a task as in progress
91    pub fn mark_in_progress(&self, task_id: i64) -> Result<()> {
92        self.sync_db.mark_task_in_progress(task_id)
93    }
94
95    /// Mark a task as completed
96    pub fn mark_completed(&self, task_id: i64) -> Result<()> {
97        self.sync_db.mark_task_completed(task_id)
98    }
99
100    /// Mark a task as failed with retry
101    pub fn mark_failed(&self, task_id: i64, error: &str, retry_after: Duration) -> Result<()> {
102        self.sync_db
103            .mark_task_failed(task_id, error, retry_after.num_seconds())
104    }
105
106    /// Recover tasks that were in progress (crashed)
107    pub fn recover_in_progress(&self) -> Result<u32> {
108        self.sync_db.recover_in_progress_tasks()
109    }
110
111    /// Get count of pending tasks
112    pub fn pending_count(&self) -> Result<u32> {
113        self.sync_db
114            .count_pending_tasks(self.profile_id, self.pipeline)
115    }
116
117    /// Get count of pending tasks for a pipeline
118    pub fn pending_count_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<u32> {
119        self.sync_db.count_pending_tasks(self.profile_id, pipeline)
120    }
121
122    /// Update the profile scope for queue operations
123    pub fn set_profile_id(&mut self, profile_id: i32) {
124        self.profile_id = profile_id;
125    }
126
127    /// Update the pipeline scope for queue operations
128    pub fn set_pipeline(&mut self, pipeline: Option<SyncPipeline>) {
129        self.pipeline = pipeline;
130    }
131
132    /// Get task counts by status
133    pub fn count_by_status(&self) -> Result<(u32, u32, u32, u32)> {
134        self.sync_db.count_tasks_by_status(self.profile_id)
135    }
136
137    /// Get task counts by type (activities, gpx, health, performance)
138    pub fn count_by_type(&self) -> Result<(u32, u32, u32, u32)> {
139        self.sync_db
140            .count_tasks_by_type(self.profile_id, self.pipeline)
141    }
142
143    /// Clear completed tasks older than given days
144    pub fn cleanup(&self, days: i32) -> Result<u32> {
145        self.sync_db.cleanup_completed_tasks(days)
146    }
147
148    /// Get the sync database (for sync state operations)
149    pub fn sync_db(&self) -> &SyncDb {
150        &self.sync_db
151    }
152
153    /// Reset all failed tasks to pending
154    pub fn reset_failed(&self) -> Result<u32> {
155        self.sync_db.reset_failed_tasks()
156    }
157
158    /// Clear all pending and failed tasks
159    pub fn clear_pending(&self) -> Result<u32> {
160        self.sync_db.clear_pending_tasks()
161    }
162}
163
164/// Thread-safe wrapper for TaskQueue for use in parallel sync
165pub struct SharedTaskQueue {
166    inner: Arc<Mutex<TaskQueue>>,
167}
168
169impl SharedTaskQueue {
170    /// Create a new shared task queue
171    pub fn new(queue: TaskQueue) -> Self {
172        Self {
173            inner: Arc::new(Mutex::new(queue)),
174        }
175    }
176
177    /// Get the next pending task (thread-safe)
178    pub async fn pop(&self) -> Result<Option<SyncTask>> {
179        let guard = self.inner.lock().await;
180        guard.pop()
181    }
182
183    /// Get the next pending task using round-robin scheduling (thread-safe)
184    pub async fn pop_round_robin(&self) -> Result<Option<SyncTask>> {
185        let mut guard = self.inner.lock().await;
186        guard.pop_round_robin()
187    }
188
189    /// Get the next pending task for a specific pipeline (thread-safe)
190    pub async fn pop_with_pipeline(
191        &self,
192        pipeline: Option<SyncPipeline>,
193    ) -> Result<Option<SyncTask>> {
194        let guard = self.inner.lock().await;
195        guard.pop_with_pipeline(pipeline)
196    }
197
198    /// Get the next pending task using round-robin for a pipeline (thread-safe)
199    pub async fn pop_round_robin_with_pipeline(
200        &self,
201        pipeline: Option<SyncPipeline>,
202    ) -> Result<Option<SyncTask>> {
203        let mut guard = self.inner.lock().await;
204        guard.pop_round_robin_with_pipeline(pipeline)
205    }
206
207    /// Add a task to the queue (thread-safe)
208    pub async fn push(&self, task: SyncTask) -> Result<i64> {
209        let guard = self.inner.lock().await;
210        guard.push(task)
211    }
212
213    /// Mark a task as in progress (thread-safe)
214    pub async fn mark_in_progress(&self, task_id: i64) -> Result<()> {
215        let guard = self.inner.lock().await;
216        guard.mark_in_progress(task_id)
217    }
218
219    /// Mark a task as completed (thread-safe)
220    pub async fn mark_completed(&self, task_id: i64) -> Result<()> {
221        let guard = self.inner.lock().await;
222        guard.mark_completed(task_id)
223    }
224
225    /// Mark a task as failed with retry (thread-safe)
226    pub async fn mark_failed(
227        &self,
228        task_id: i64,
229        error: &str,
230        retry_after: Duration,
231    ) -> Result<()> {
232        let guard = self.inner.lock().await;
233        guard.mark_failed(task_id, error, retry_after)
234    }
235
236    /// Get count of pending tasks (thread-safe)
237    pub async fn pending_count(&self) -> Result<u32> {
238        let guard = self.inner.lock().await;
239        guard.pending_count()
240    }
241
242    /// Get count of pending tasks for a pipeline (thread-safe)
243    pub async fn pending_count_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<u32> {
244        let guard = self.inner.lock().await;
245        guard.pending_count_with_pipeline(pipeline)
246    }
247
248    /// Update the profile scope for queue operations (thread-safe)
249    pub async fn set_profile_id(&self, profile_id: i32) {
250        let mut guard = self.inner.lock().await;
251        guard.set_profile_id(profile_id);
252    }
253
254    /// Update the pipeline scope for queue operations (thread-safe)
255    pub async fn set_pipeline(&self, pipeline: Option<SyncPipeline>) {
256        let mut guard = self.inner.lock().await;
257        guard.set_pipeline(pipeline);
258    }
259
260    /// Get task counts by status (thread-safe)
261    pub async fn count_by_status(&self) -> Result<(u32, u32, u32, u32)> {
262        let guard = self.inner.lock().await;
263        guard.count_by_status()
264    }
265
266    /// Recover tasks that were in progress (crashed) (thread-safe)
267    pub async fn recover_in_progress(&self) -> Result<u32> {
268        let guard = self.inner.lock().await;
269        guard.recover_in_progress()
270    }
271
272    /// Clear completed tasks older than given days (thread-safe)
273    pub async fn cleanup(&self, days: i32) -> Result<u32> {
274        let guard = self.inner.lock().await;
275        guard.cleanup(days)
276    }
277}
278
279impl Clone for SharedTaskQueue {
280    fn clone(&self) -> Self {
281        Self {
282            inner: Arc::clone(&self.inner),
283        }
284    }
285}
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290    use chrono::NaiveDate;
291
292    fn setup() -> TaskQueue {
293        let sync_db = SyncDb::open_in_memory().unwrap();
294        TaskQueue::new(sync_db, 1, None)
295    }
296
297    #[test]
298    fn test_push_and_pop() {
299        let queue = setup();
300
301        let task = SyncTask::new(
302            1,
303            SyncPipeline::Frontier,
304            SyncTaskType::Activities {
305                start: 0,
306                limit: 50,
307                min_date: None,
308                max_date: None,
309            },
310        );
311        let id = queue.push(task).unwrap();
312        assert!(id > 0);
313
314        let popped = queue.pop().unwrap();
315        assert!(popped.is_some());
316        let popped = popped.unwrap();
317        assert_eq!(popped.profile_id, 1);
318    }
319
320    #[test]
321    fn test_mark_completed() {
322        let queue = setup();
323
324        let task = SyncTask::new(
325            1,
326            SyncPipeline::Frontier,
327            SyncTaskType::Activities {
328                start: 0,
329                limit: 50,
330                min_date: None,
331                max_date: None,
332            },
333        );
334        let id = queue.push(task).unwrap();
335
336        queue.mark_in_progress(id).unwrap();
337        queue.mark_completed(id).unwrap();
338
339        // Should not pop completed tasks
340        let popped = queue.pop().unwrap();
341        assert!(popped.is_none());
342    }
343
344    #[test]
345    fn test_pending_count() {
346        let queue = setup();
347
348        assert_eq!(queue.pending_count().unwrap(), 0);
349
350        queue
351            .push(SyncTask::new(
352                1,
353                SyncPipeline::Frontier,
354                SyncTaskType::Activities {
355                    start: 0,
356                    limit: 50,
357                    min_date: None,
358                    max_date: None,
359                },
360            ))
361            .unwrap();
362        queue
363            .push(SyncTask::new(
364                1,
365                SyncPipeline::Frontier,
366                SyncTaskType::DailyHealth {
367                    date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
368                },
369            ))
370            .unwrap();
371
372        assert_eq!(queue.pending_count().unwrap(), 2);
373    }
374
375    #[test]
376    fn test_recover_in_progress() {
377        let queue = setup();
378
379        let task = SyncTask::new(
380            1,
381            SyncPipeline::Frontier,
382            SyncTaskType::Activities {
383                start: 0,
384                limit: 50,
385                min_date: None,
386                max_date: None,
387            },
388        );
389        let id = queue.push(task).unwrap();
390        queue.mark_in_progress(id).unwrap();
391
392        // Simulate crash recovery
393        let recovered = queue.recover_in_progress().unwrap();
394        assert_eq!(recovered, 1);
395
396        // Should be able to pop again
397        let popped = queue.pop().unwrap();
398        assert!(popped.is_some());
399    }
400
401    #[test]
402    fn test_profile_id_update_affects_pending_count() {
403        let sync_db = SyncDb::open_in_memory().unwrap();
404        let mut queue = TaskQueue::new(sync_db, 1, None);
405
406        queue
407            .push(SyncTask::new(
408                2,
409                SyncPipeline::Frontier,
410                SyncTaskType::DailyHealth {
411                    date: NaiveDate::from_ymd_opt(2025, 1, 2).unwrap(),
412                },
413            ))
414            .unwrap();
415
416        assert_eq!(queue.pending_count().unwrap(), 0);
417
418        queue.set_profile_id(2);
419        assert_eq!(queue.pending_count().unwrap(), 1);
420    }
421
422    #[test]
423    fn test_pop_round_robin_prefers_activity_first() {
424        let sync_db = SyncDb::open_in_memory().unwrap();
425        let mut queue = TaskQueue::new(sync_db, 1, None);
426
427        queue
428            .push(SyncTask::new(
429                1,
430                SyncPipeline::Frontier,
431                SyncTaskType::DailyHealth {
432                    date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
433                },
434            ))
435            .unwrap();
436        queue
437            .push(SyncTask::new(
438                1,
439                SyncPipeline::Frontier,
440                SyncTaskType::Activities {
441                    start: 0,
442                    limit: 50,
443                    min_date: None,
444                    max_date: None,
445                },
446            ))
447            .unwrap();
448
449        let first = queue.pop_round_robin().unwrap().unwrap();
450        assert!(matches!(first.task_type, SyncTaskType::Activities { .. }));
451    }
452
453    #[test]
454    fn test_pop_with_pipeline_filters() {
455        let sync_db = SyncDb::open_in_memory().unwrap();
456        let queue = TaskQueue::new(sync_db, 1, None);
457
458        let frontier_task = SyncTask::new(
459            1,
460            SyncPipeline::Frontier,
461            SyncTaskType::DailyHealth {
462                date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
463            },
464        );
465        let backfill_task = SyncTask::new(
466            1,
467            SyncPipeline::Backfill,
468            SyncTaskType::DailyHealth {
469                date: NaiveDate::from_ymd_opt(2025, 1, 2).unwrap(),
470            },
471        );
472
473        let id_frontier = queue.push(frontier_task).unwrap();
474        let id_backfill = queue.push(backfill_task).unwrap();
475
476        let popped_backfill = queue
477            .pop_with_pipeline(Some(SyncPipeline::Backfill))
478            .unwrap()
479            .unwrap();
480        assert_eq!(popped_backfill.id, Some(id_backfill));
481
482        let popped_frontier = queue
483            .pop_with_pipeline(Some(SyncPipeline::Frontier))
484            .unwrap()
485            .unwrap();
486        assert_eq!(popped_frontier.id, Some(id_frontier));
487    }
488}