1use chrono::{Duration, Utc};
4use duckdb::params;
5
6use crate::db::models::{SyncTask, SyncTaskType, TaskStatus};
7use crate::{Database, Result};
8
9pub struct TaskQueue {
11 db: Database,
12}
13
14impl TaskQueue {
15 pub fn new(db: Database) -> Self {
17 Self { db }
18 }
19
20 pub fn push(&self, task: SyncTask) -> Result<i64> {
22 let conn = self.db.connection();
23 let conn = conn.lock().unwrap();
24
25 let task_data = serde_json::to_string(&task.task_type)
26 .map_err(|e| crate::GarminError::Database(e.to_string()))?;
27
28 let id: i64 = conn
30 .query_row(
31 "INSERT INTO sync_tasks (profile_id, task_type, task_data, status, attempts)
32 VALUES (?, ?, ?, ?, ?)
33 RETURNING id",
34 params![
35 task.profile_id,
36 task_type_name(&task.task_type),
37 task_data,
38 task.status.to_string(),
39 task.attempts,
40 ],
41 |row| row.get(0),
42 )
43 .map_err(|e| crate::GarminError::Database(e.to_string()))?;
44
45 Ok(id)
46 }
47
48 pub fn pop(&self) -> Result<Option<SyncTask>> {
50 let conn = self.db.connection();
51 let conn = conn.lock().unwrap();
52
53 let result = conn.query_row(
55 "SELECT id, profile_id, task_type, task_data, status, attempts, last_error,
56 created_at, next_retry_at, completed_at
57 FROM sync_tasks
58 WHERE status IN ('pending', 'failed')
59 AND (next_retry_at IS NULL OR next_retry_at <= CURRENT_TIMESTAMP)
60 ORDER BY
61 CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
62 created_at
63 LIMIT 1",
64 [],
65 |row| {
66 let id: i64 = row.get(0)?;
67 let profile_id: i32 = row.get(1)?;
68 let _task_type_name: String = row.get(2)?;
69 let task_data: String = row.get(3)?;
70 let status_str: String = row.get(4)?;
71 let attempts: i32 = row.get(5)?;
72 let last_error: Option<String> = row.get(6)?;
73
74 Ok((id, profile_id, task_data, status_str, attempts, last_error))
75 },
76 );
77
78 match result {
79 Ok((id, profile_id, task_data, status_str, attempts, last_error)) => {
80 let task_type: SyncTaskType = serde_json::from_str(&task_data)
81 .map_err(|e| crate::GarminError::Database(e.to_string()))?;
82
83 let status = match status_str.as_str() {
84 "pending" => TaskStatus::Pending,
85 "in_progress" => TaskStatus::InProgress,
86 "completed" => TaskStatus::Completed,
87 "failed" => TaskStatus::Failed,
88 _ => TaskStatus::Pending,
89 };
90
91 Ok(Some(SyncTask {
92 id: Some(id),
93 profile_id,
94 task_type,
95 status,
96 attempts,
97 last_error,
98 created_at: None,
99 next_retry_at: None,
100 completed_at: None,
101 }))
102 }
103 Err(duckdb::Error::QueryReturnedNoRows) => Ok(None),
104 Err(e) => Err(crate::GarminError::Database(e.to_string())),
105 }
106 }
107
108 pub fn mark_in_progress(&self, task_id: i64) -> Result<()> {
110 self.db.execute_params(
111 "UPDATE sync_tasks SET status = 'in_progress' WHERE id = ?",
112 params![task_id],
113 )?;
114 Ok(())
115 }
116
117 pub fn mark_completed(&self, task_id: i64) -> Result<()> {
119 self.db.execute_params(
120 "UPDATE sync_tasks SET status = 'completed', completed_at = CURRENT_TIMESTAMP WHERE id = ?",
121 params![task_id],
122 )?;
123 Ok(())
124 }
125
126 pub fn mark_failed(&self, task_id: i64, error: &str, retry_after: Duration) -> Result<()> {
128 let retry_at = Utc::now() + retry_after;
129
130 let conn = self.db.connection();
131 let conn = conn.lock().unwrap();
132
133 conn.execute(
134 "UPDATE sync_tasks
135 SET status = 'failed',
136 attempts = attempts + 1,
137 last_error = ?,
138 next_retry_at = ?
139 WHERE id = ?",
140 params![error, retry_at.to_rfc3339(), task_id],
141 )
142 .map_err(|e| crate::GarminError::Database(e.to_string()))?;
143
144 Ok(())
145 }
146
147 pub fn recover_in_progress(&self) -> Result<u32> {
149 let count = self
150 .db
151 .execute("UPDATE sync_tasks SET status = 'pending' WHERE status = 'in_progress'")?;
152 Ok(count as u32)
153 }
154
155 pub fn pending_count(&self) -> Result<i64> {
157 let conn = self.db.connection();
158 let conn = conn.lock().unwrap();
159
160 let count: i64 = conn
161 .query_row(
162 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed')",
163 [],
164 |row| row.get(0),
165 )
166 .map_err(|e| crate::GarminError::Database(e.to_string()))?;
167
168 Ok(count)
169 }
170
171 pub fn cleanup(&self, days: i32) -> Result<u32> {
173 let sql = format!(
176 "DELETE FROM sync_tasks
177 WHERE status = 'completed'
178 AND completed_at < CURRENT_TIMESTAMP - INTERVAL {} DAY",
179 days
180 );
181 let count = self.db.execute(&sql)?;
182 Ok(count as u32)
183 }
184
185 pub fn reset_failed(&self) -> Result<u32> {
187 let count = self.db.execute(
188 "UPDATE sync_tasks SET status = 'pending', next_retry_at = NULL, attempts = 0
189 WHERE status = 'failed'",
190 )?;
191 Ok(count as u32)
192 }
193
194 pub fn clear_pending(&self) -> Result<u32> {
196 let count = self
197 .db
198 .execute("DELETE FROM sync_tasks WHERE status IN ('pending', 'failed')")?;
199 Ok(count as u32)
200 }
201}
202
203fn task_type_name(task_type: &SyncTaskType) -> &'static str {
205 match task_type {
206 SyncTaskType::Activities { .. } => "activities",
207 SyncTaskType::ActivityDetail { .. } => "activity_detail",
208 SyncTaskType::DownloadGpx { .. } => "download_gpx",
209 SyncTaskType::DailyHealth { .. } => "daily_health",
210 SyncTaskType::Performance { .. } => "performance",
211 SyncTaskType::Weight { .. } => "weight",
212 SyncTaskType::GenerateEmbeddings { .. } => "generate_embeddings",
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use chrono::NaiveDate;
220
221 fn setup() -> TaskQueue {
222 let db = Database::in_memory().unwrap();
223 TaskQueue::new(db)
224 }
225
226 #[test]
227 fn test_push_and_pop() {
228 let queue = setup();
229
230 let task = SyncTask::new(
231 1,
232 SyncTaskType::Activities {
233 start: 0,
234 limit: 50,
235 },
236 );
237 let id = queue.push(task).unwrap();
238 assert!(id > 0);
239
240 let popped = queue.pop().unwrap();
241 assert!(popped.is_some());
242 let popped = popped.unwrap();
243 assert_eq!(popped.profile_id, 1);
244 }
245
246 #[test]
247 fn test_mark_completed() {
248 let queue = setup();
249
250 let task = SyncTask::new(
251 1,
252 SyncTaskType::Activities {
253 start: 0,
254 limit: 50,
255 },
256 );
257 let id = queue.push(task).unwrap();
258
259 queue.mark_in_progress(id).unwrap();
260 queue.mark_completed(id).unwrap();
261
262 let popped = queue.pop().unwrap();
264 assert!(popped.is_none());
265 }
266
267 #[test]
268 fn test_pending_count() {
269 let queue = setup();
270
271 assert_eq!(queue.pending_count().unwrap(), 0);
272
273 queue
274 .push(SyncTask::new(
275 1,
276 SyncTaskType::Activities {
277 start: 0,
278 limit: 50,
279 },
280 ))
281 .unwrap();
282 queue
283 .push(SyncTask::new(
284 1,
285 SyncTaskType::DailyHealth {
286 date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
287 },
288 ))
289 .unwrap();
290
291 assert_eq!(queue.pending_count().unwrap(), 2);
292 }
293
294 #[test]
295 fn test_recover_in_progress() {
296 let queue = setup();
297
298 let task = SyncTask::new(
299 1,
300 SyncTaskType::Activities {
301 start: 0,
302 limit: 50,
303 },
304 );
305 let id = queue.push(task).unwrap();
306 queue.mark_in_progress(id).unwrap();
307
308 let recovered = queue.recover_in_progress().unwrap();
310 assert_eq!(recovered, 1);
311
312 let popped = queue.pop().unwrap();
314 assert!(popped.is_some());
315 }
316}