garmin_cli/sync/
task_queue.rs

1//! Task queue for sync operations with crash recovery
2
3use chrono::{Duration, Utc};
4use duckdb::params;
5
6use crate::db::models::{SyncTask, SyncTaskType, TaskStatus};
7use crate::{Database, Result};
8
9/// Task queue backed by DuckDB for persistence
10pub struct TaskQueue {
11    db: Database,
12}
13
14impl TaskQueue {
15    /// Create a new task queue
16    pub fn new(db: Database) -> Self {
17        Self { db }
18    }
19
20    /// Add a task to the queue
21    pub fn push(&self, task: SyncTask) -> Result<i64> {
22        let conn = self.db.connection();
23        let conn = conn.lock().unwrap();
24
25        let task_data = serde_json::to_string(&task.task_type)
26            .map_err(|e| crate::GarminError::Database(e.to_string()))?;
27
28        // Use RETURNING to get the inserted id
29        let id: i64 = conn
30            .query_row(
31                "INSERT INTO sync_tasks (profile_id, task_type, task_data, status, attempts)
32                 VALUES (?, ?, ?, ?, ?)
33                 RETURNING id",
34                params![
35                    task.profile_id,
36                    task_type_name(&task.task_type),
37                    task_data,
38                    task.status.to_string(),
39                    task.attempts,
40                ],
41                |row| row.get(0),
42            )
43            .map_err(|e| crate::GarminError::Database(e.to_string()))?;
44
45        Ok(id)
46    }
47
48    /// Get the next pending task
49    pub fn pop(&self) -> Result<Option<SyncTask>> {
50        let conn = self.db.connection();
51        let conn = conn.lock().unwrap();
52
53        // Priority: failed tasks ready for retry > pending tasks
54        let result = conn.query_row(
55            "SELECT id, profile_id, task_type, task_data, status, attempts, last_error,
56                    created_at, next_retry_at, completed_at
57             FROM sync_tasks
58             WHERE status IN ('pending', 'failed')
59               AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
60             ORDER BY
61               CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
62               created_at
63             LIMIT 1",
64            [],
65            |row| {
66                let id: i64 = row.get(0)?;
67                let profile_id: i32 = row.get(1)?;
68                let _task_type_name: String = row.get(2)?;
69                let task_data: String = row.get(3)?;
70                let status_str: String = row.get(4)?;
71                let attempts: i32 = row.get(5)?;
72                let last_error: Option<String> = row.get(6)?;
73
74                Ok((id, profile_id, task_data, status_str, attempts, last_error))
75            },
76        );
77
78        match result {
79            Ok((id, profile_id, task_data, status_str, attempts, last_error)) => {
80                let task_type: SyncTaskType = serde_json::from_str(&task_data)
81                    .map_err(|e| crate::GarminError::Database(e.to_string()))?;
82
83                let status = match status_str.as_str() {
84                    "pending" => TaskStatus::Pending,
85                    "in_progress" => TaskStatus::InProgress,
86                    "completed" => TaskStatus::Completed,
87                    "failed" => TaskStatus::Failed,
88                    _ => TaskStatus::Pending,
89                };
90
91                Ok(Some(SyncTask {
92                    id: Some(id),
93                    profile_id,
94                    task_type,
95                    status,
96                    attempts,
97                    last_error,
98                    created_at: None,
99                    next_retry_at: None,
100                    completed_at: None,
101                }))
102            }
103            Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
104            Err(e) => Err(crate::GarminError::Database(e.to_string())),
105        }
106    }
107
108    /// Mark a task as in progress
109    pub fn mark_in_progress(&self, task_id: i64) -> Result<()> {
110        self.db.execute_params(
111            "UPDATE sync_tasks SET status = 'in_progress' WHERE id = ?",
112            params![task_id],
113        )?;
114        Ok(())
115    }
116
117    /// Mark a task as completed
118    pub fn mark_completed(&self, task_id: i64) -> Result<()> {
119        self.db.execute_params(
120            "UPDATE sync_tasks SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?",
121            params![task_id],
122        )?;
123        Ok(())
124    }
125
126    /// Mark a task as failed with retry
127    pub fn mark_failed(&self, task_id: i64, error: &str, retry_after: Duration) -> Result<()> {
128        let retry_at = Utc::now() + retry_after;
129
130        let conn = self.db.connection();
131        let conn = conn.lock().unwrap();
132
133        conn.execute(
134            "UPDATE sync_tasks
135             SET status = 'failed',
136                 attempts = attempts + 1,
137                 last_error = ?,
138                 next_retry_at = ?
139             WHERE id = ?",
140            params![error, retry_at.to_rfc3339(), task_id],
141        )
142        .map_err(|e| crate::GarminError::Database(e.to_string()))?;
143
144        Ok(())
145    }
146
147    /// Recover tasks that were in progress (crashed)
148    pub fn recover_in_progress(&self) -> Result<u32> {
149        let count = self.db.execute(
150            "UPDATE sync_tasks SET status = 'pending' WHERE status = 'in_progress'",
151        )?;
152        Ok(count as u32)
153    }
154
155    /// Get count of pending tasks
156    pub fn pending_count(&self) -> Result<i64> {
157        let conn = self.db.connection();
158        let conn = conn.lock().unwrap();
159
160        let count: i64 = conn
161            .query_row(
162                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed')",
163                [],
164                |row| row.get(0),
165            )
166            .map_err(|e| crate::GarminError::Database(e.to_string()))?;
167
168        Ok(count)
169    }
170
171    /// Clear completed tasks older than given days
172    pub fn cleanup(&self, days: i32) -> Result<u32> {
173        // DuckDB doesn't support parameterized INTERVAL, so format directly
174        // (days is a trusted i32 from code, not user input)
175        let sql = format!(
176            "DELETE FROM sync_tasks
177             WHERE status = 'completed'
178               AND completed_at < CURRENT_TIMESTAMP - INTERVAL {} DAY",
179            days
180        );
181        let count = self.db.execute(&sql)?;
182        Ok(count as u32)
183    }
184
185    /// Reset all failed tasks to pending (clear retry delays)
186    pub fn reset_failed(&self) -> Result<u32> {
187        let count = self.db.execute(
188            "UPDATE sync_tasks SET status = 'pending', next_retry_at = NULL, attempts = 0
189             WHERE status = 'failed'",
190        )?;
191        Ok(count as u32)
192    }
193
194    /// Clear all pending and failed tasks
195    pub fn clear_pending(&self) -> Result<u32> {
196        let count = self.db.execute(
197            "DELETE FROM sync_tasks WHERE status IN ('pending', 'failed')",
198        )?;
199        Ok(count as u32)
200    }
201}
202
203/// Get task type name for storage
204fn task_type_name(task_type: &SyncTaskType) -> &'static str {
205    match task_type {
206        SyncTaskType::Activities { .. } => "activities",
207        SyncTaskType::ActivityDetail { .. } => "activity_detail",
208        SyncTaskType::DownloadGpx { .. } => "download_gpx",
209        SyncTaskType::DailyHealth { .. } => "daily_health",
210        SyncTaskType::Performance { .. } => "performance",
211        SyncTaskType::Weight { .. } => "weight",
212        SyncTaskType::GenerateEmbeddings { .. } => "generate_embeddings",
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use chrono::NaiveDate;
220
221    fn setup() -> TaskQueue {
222        let db = Database::in_memory().unwrap();
223        TaskQueue::new(db)
224    }
225
226    #[test]
227    fn test_push_and_pop() {
228        let queue = setup();
229
230        let task = SyncTask::new(1, SyncTaskType::Activities { start: 0, limit: 50 });
231        let id = queue.push(task).unwrap();
232        assert!(id > 0);
233
234        let popped = queue.pop().unwrap();
235        assert!(popped.is_some());
236        let popped = popped.unwrap();
237        assert_eq!(popped.profile_id, 1);
238    }
239
240    #[test]
241    fn test_mark_completed() {
242        let queue = setup();
243
244        let task = SyncTask::new(1, SyncTaskType::Activities { start: 0, limit: 50 });
245        let id = queue.push(task).unwrap();
246
247        queue.mark_in_progress(id).unwrap();
248        queue.mark_completed(id).unwrap();
249
250        // Should not pop completed tasks
251        let popped = queue.pop().unwrap();
252        assert!(popped.is_none());
253    }
254
255    #[test]
256    fn test_pending_count() {
257        let queue = setup();
258
259        assert_eq!(queue.pending_count().unwrap(), 0);
260
261        queue.push(SyncTask::new(1, SyncTaskType::Activities { start: 0, limit: 50 })).unwrap();
262        queue.push(SyncTask::new(1, SyncTaskType::DailyHealth {
263            date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap()
264        })).unwrap();
265
266        assert_eq!(queue.pending_count().unwrap(), 2);
267    }
268
269    #[test]
270    fn test_recover_in_progress() {
271        let queue = setup();
272
273        let task = SyncTask::new(1, SyncTaskType::Activities { start: 0, limit: 50 });
274        let id = queue.push(task).unwrap();
275        queue.mark_in_progress(id).unwrap();
276
277        // Simulate crash recovery
278        let recovered = queue.recover_in_progress().unwrap();
279        assert_eq!(recovered, 1);
280
281        // Should be able to pop again
282        let popped = queue.pop().unwrap();
283        assert!(popped.is_some());
284    }
285}