1use std::sync::Arc;
6
7use chrono::Duration;
8use tokio::sync::Mutex;
9
10#[cfg(test)]
11use crate::db::models::SyncTaskType;
12use crate::db::models::{SyncPipeline, SyncTask};
13use crate::storage::SyncDb;
14use crate::Result;
15
16pub struct TaskQueue {
18 sync_db: SyncDb,
19 profile_id: i32,
20 pipeline: Option<SyncPipeline>,
21 rr_index: usize,
22}
23
24impl TaskQueue {
25 pub fn new(sync_db: SyncDb, profile_id: i32, pipeline: Option<SyncPipeline>) -> Self {
27 Self {
28 sync_db,
29 profile_id,
30 pipeline,
31 rr_index: 0,
32 }
33 }
34
35 pub fn push(&self, task: SyncTask) -> Result<i64> {
37 self.sync_db.push_task(&task)
38 }
39
40 pub fn pop(&self) -> Result<Option<SyncTask>> {
42 self.sync_db.pop_task(self.profile_id, self.pipeline)
43 }
44
45 pub fn pop_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<Option<SyncTask>> {
47 self.sync_db.pop_task(self.profile_id, pipeline)
48 }
49
50 pub fn pop_round_robin(&mut self) -> Result<Option<SyncTask>> {
52 const TASK_TYPES: [&str; 4] = ["activities", "download_gpx", "performance", "daily_health"];
53
54 for _ in 0..TASK_TYPES.len() {
55 let idx = self.rr_index % TASK_TYPES.len();
56 self.rr_index = self.rr_index.wrapping_add(1);
57 if let Some(task) =
58 self.sync_db
59 .pop_task_by_type(self.profile_id, TASK_TYPES[idx], self.pipeline)?
60 {
61 return Ok(Some(task));
62 }
63 }
64
65 self.sync_db.pop_task(self.profile_id, self.pipeline)
67 }
68
69 pub fn pop_round_robin_with_pipeline(
71 &mut self,
72 pipeline: Option<SyncPipeline>,
73 ) -> Result<Option<SyncTask>> {
74 const TASK_TYPES: [&str; 4] = ["activities", "download_gpx", "performance", "daily_health"];
75
76 for _ in 0..TASK_TYPES.len() {
77 let idx = self.rr_index % TASK_TYPES.len();
78 self.rr_index = self.rr_index.wrapping_add(1);
79 if let Some(task) =
80 self.sync_db
81 .pop_task_by_type(self.profile_id, TASK_TYPES[idx], pipeline)?
82 {
83 return Ok(Some(task));
84 }
85 }
86
87 self.sync_db.pop_task(self.profile_id, pipeline)
88 }
89
90 pub fn mark_in_progress(&self, task_id: i64) -> Result<()> {
92 self.sync_db.mark_task_in_progress(task_id)
93 }
94
95 pub fn mark_completed(&self, task_id: i64) -> Result<()> {
97 self.sync_db.mark_task_completed(task_id)
98 }
99
100 pub fn mark_failed(&self, task_id: i64, error: &str, retry_after: Duration) -> Result<()> {
102 self.sync_db
103 .mark_task_failed(task_id, error, retry_after.num_seconds())
104 }
105
106 pub fn recover_in_progress(&self) -> Result<u32> {
108 self.sync_db.recover_in_progress_tasks()
109 }
110
111 pub fn pending_count(&self) -> Result<u32> {
113 self.sync_db
114 .count_pending_tasks(self.profile_id, self.pipeline)
115 }
116
117 pub fn pending_count_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<u32> {
119 self.sync_db.count_pending_tasks(self.profile_id, pipeline)
120 }
121
122 pub fn set_profile_id(&mut self, profile_id: i32) {
124 self.profile_id = profile_id;
125 }
126
127 pub fn set_pipeline(&mut self, pipeline: Option<SyncPipeline>) {
129 self.pipeline = pipeline;
130 }
131
132 pub fn count_by_status(&self) -> Result<(u32, u32, u32, u32)> {
134 self.sync_db.count_tasks_by_status(self.profile_id)
135 }
136
137 pub fn count_by_type(&self) -> Result<(u32, u32, u32, u32)> {
139 self.sync_db
140 .count_tasks_by_type(self.profile_id, self.pipeline)
141 }
142
143 pub fn cleanup(&self, days: i32) -> Result<u32> {
145 self.sync_db.cleanup_completed_tasks(days)
146 }
147
148 pub fn sync_db(&self) -> &SyncDb {
150 &self.sync_db
151 }
152
153 pub fn reset_failed(&self) -> Result<u32> {
155 self.sync_db.reset_failed_tasks()
156 }
157
158 pub fn clear_pending(&self) -> Result<u32> {
160 self.sync_db.clear_pending_tasks()
161 }
162}
163
164pub struct SharedTaskQueue {
166 inner: Arc<Mutex<TaskQueue>>,
167}
168
169impl SharedTaskQueue {
170 pub fn new(queue: TaskQueue) -> Self {
172 Self {
173 inner: Arc::new(Mutex::new(queue)),
174 }
175 }
176
177 pub async fn pop(&self) -> Result<Option<SyncTask>> {
179 let guard = self.inner.lock().await;
180 guard.pop()
181 }
182
183 pub async fn pop_round_robin(&self) -> Result<Option<SyncTask>> {
185 let mut guard = self.inner.lock().await;
186 guard.pop_round_robin()
187 }
188
189 pub async fn pop_with_pipeline(
191 &self,
192 pipeline: Option<SyncPipeline>,
193 ) -> Result<Option<SyncTask>> {
194 let guard = self.inner.lock().await;
195 guard.pop_with_pipeline(pipeline)
196 }
197
198 pub async fn pop_round_robin_with_pipeline(
200 &self,
201 pipeline: Option<SyncPipeline>,
202 ) -> Result<Option<SyncTask>> {
203 let mut guard = self.inner.lock().await;
204 guard.pop_round_robin_with_pipeline(pipeline)
205 }
206
207 pub async fn push(&self, task: SyncTask) -> Result<i64> {
209 let guard = self.inner.lock().await;
210 guard.push(task)
211 }
212
213 pub async fn mark_in_progress(&self, task_id: i64) -> Result<()> {
215 let guard = self.inner.lock().await;
216 guard.mark_in_progress(task_id)
217 }
218
219 pub async fn mark_completed(&self, task_id: i64) -> Result<()> {
221 let guard = self.inner.lock().await;
222 guard.mark_completed(task_id)
223 }
224
225 pub async fn mark_failed(
227 &self,
228 task_id: i64,
229 error: &str,
230 retry_after: Duration,
231 ) -> Result<()> {
232 let guard = self.inner.lock().await;
233 guard.mark_failed(task_id, error, retry_after)
234 }
235
236 pub async fn pending_count(&self) -> Result<u32> {
238 let guard = self.inner.lock().await;
239 guard.pending_count()
240 }
241
242 pub async fn pending_count_with_pipeline(&self, pipeline: Option<SyncPipeline>) -> Result<u32> {
244 let guard = self.inner.lock().await;
245 guard.pending_count_with_pipeline(pipeline)
246 }
247
248 pub async fn set_profile_id(&self, profile_id: i32) {
250 let mut guard = self.inner.lock().await;
251 guard.set_profile_id(profile_id);
252 }
253
254 pub async fn set_pipeline(&self, pipeline: Option<SyncPipeline>) {
256 let mut guard = self.inner.lock().await;
257 guard.set_pipeline(pipeline);
258 }
259
260 pub async fn count_by_status(&self) -> Result<(u32, u32, u32, u32)> {
262 let guard = self.inner.lock().await;
263 guard.count_by_status()
264 }
265
266 pub async fn recover_in_progress(&self) -> Result<u32> {
268 let guard = self.inner.lock().await;
269 guard.recover_in_progress()
270 }
271
272 pub async fn cleanup(&self, days: i32) -> Result<u32> {
274 let guard = self.inner.lock().await;
275 guard.cleanup(days)
276 }
277}
278
279impl Clone for SharedTaskQueue {
280 fn clone(&self) -> Self {
281 Self {
282 inner: Arc::clone(&self.inner),
283 }
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use chrono::NaiveDate;
291
292 fn setup() -> TaskQueue {
293 let sync_db = SyncDb::open_in_memory().unwrap();
294 TaskQueue::new(sync_db, 1, None)
295 }
296
297 #[test]
298 fn test_push_and_pop() {
299 let queue = setup();
300
301 let task = SyncTask::new(
302 1,
303 SyncPipeline::Frontier,
304 SyncTaskType::Activities {
305 start: 0,
306 limit: 50,
307 min_date: None,
308 max_date: None,
309 },
310 );
311 let id = queue.push(task).unwrap();
312 assert!(id > 0);
313
314 let popped = queue.pop().unwrap();
315 assert!(popped.is_some());
316 let popped = popped.unwrap();
317 assert_eq!(popped.profile_id, 1);
318 }
319
320 #[test]
321 fn test_mark_completed() {
322 let queue = setup();
323
324 let task = SyncTask::new(
325 1,
326 SyncPipeline::Frontier,
327 SyncTaskType::Activities {
328 start: 0,
329 limit: 50,
330 min_date: None,
331 max_date: None,
332 },
333 );
334 let id = queue.push(task).unwrap();
335
336 queue.mark_in_progress(id).unwrap();
337 queue.mark_completed(id).unwrap();
338
339 let popped = queue.pop().unwrap();
341 assert!(popped.is_none());
342 }
343
344 #[test]
345 fn test_pending_count() {
346 let queue = setup();
347
348 assert_eq!(queue.pending_count().unwrap(), 0);
349
350 queue
351 .push(SyncTask::new(
352 1,
353 SyncPipeline::Frontier,
354 SyncTaskType::Activities {
355 start: 0,
356 limit: 50,
357 min_date: None,
358 max_date: None,
359 },
360 ))
361 .unwrap();
362 queue
363 .push(SyncTask::new(
364 1,
365 SyncPipeline::Frontier,
366 SyncTaskType::DailyHealth {
367 date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
368 },
369 ))
370 .unwrap();
371
372 assert_eq!(queue.pending_count().unwrap(), 2);
373 }
374
375 #[test]
376 fn test_recover_in_progress() {
377 let queue = setup();
378
379 let task = SyncTask::new(
380 1,
381 SyncPipeline::Frontier,
382 SyncTaskType::Activities {
383 start: 0,
384 limit: 50,
385 min_date: None,
386 max_date: None,
387 },
388 );
389 let id = queue.push(task).unwrap();
390 queue.mark_in_progress(id).unwrap();
391
392 let recovered = queue.recover_in_progress().unwrap();
394 assert_eq!(recovered, 1);
395
396 let popped = queue.pop().unwrap();
398 assert!(popped.is_some());
399 }
400
401 #[test]
402 fn test_profile_id_update_affects_pending_count() {
403 let sync_db = SyncDb::open_in_memory().unwrap();
404 let mut queue = TaskQueue::new(sync_db, 1, None);
405
406 queue
407 .push(SyncTask::new(
408 2,
409 SyncPipeline::Frontier,
410 SyncTaskType::DailyHealth {
411 date: NaiveDate::from_ymd_opt(2025, 1, 2).unwrap(),
412 },
413 ))
414 .unwrap();
415
416 assert_eq!(queue.pending_count().unwrap(), 0);
417
418 queue.set_profile_id(2);
419 assert_eq!(queue.pending_count().unwrap(), 1);
420 }
421
422 #[test]
423 fn test_pop_round_robin_prefers_activity_first() {
424 let sync_db = SyncDb::open_in_memory().unwrap();
425 let mut queue = TaskQueue::new(sync_db, 1, None);
426
427 queue
428 .push(SyncTask::new(
429 1,
430 SyncPipeline::Frontier,
431 SyncTaskType::DailyHealth {
432 date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
433 },
434 ))
435 .unwrap();
436 queue
437 .push(SyncTask::new(
438 1,
439 SyncPipeline::Frontier,
440 SyncTaskType::Activities {
441 start: 0,
442 limit: 50,
443 min_date: None,
444 max_date: None,
445 },
446 ))
447 .unwrap();
448
449 let first = queue.pop_round_robin().unwrap().unwrap();
450 assert!(matches!(first.task_type, SyncTaskType::Activities { .. }));
451 }
452
453 #[test]
454 fn test_pop_with_pipeline_filters() {
455 let sync_db = SyncDb::open_in_memory().unwrap();
456 let queue = TaskQueue::new(sync_db, 1, None);
457
458 let frontier_task = SyncTask::new(
459 1,
460 SyncPipeline::Frontier,
461 SyncTaskType::DailyHealth {
462 date: NaiveDate::from_ymd_opt(2025, 1, 1).unwrap(),
463 },
464 );
465 let backfill_task = SyncTask::new(
466 1,
467 SyncPipeline::Backfill,
468 SyncTaskType::DailyHealth {
469 date: NaiveDate::from_ymd_opt(2025, 1, 2).unwrap(),
470 },
471 );
472
473 let id_frontier = queue.push(frontier_task).unwrap();
474 let id_backfill = queue.push(backfill_task).unwrap();
475
476 let popped_backfill = queue
477 .pop_with_pipeline(Some(SyncPipeline::Backfill))
478 .unwrap()
479 .unwrap();
480 assert_eq!(popped_backfill.id, Some(id_backfill));
481
482 let popped_frontier = queue
483 .pop_with_pipeline(Some(SyncPipeline::Frontier))
484 .unwrap()
485 .unwrap();
486 assert_eq!(popped_frontier.id, Some(id_frontier));
487 }
488}