1use std::path::Path;
8
9use chrono::{DateTime, NaiveDate, Utc};
10use rusqlite::{params, Connection, OptionalExtension};
11
12use crate::db::models::{SyncPipeline, SyncState, SyncTask, SyncTaskType, TaskStatus};
13use crate::error::{GarminError, Result};
14
15pub struct SyncDb {
17 conn: Connection,
18}
19
20impl SyncDb {
21 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
23 let conn = Connection::open(path.as_ref())
24 .map_err(|e| GarminError::Database(format!("Failed to open sync database: {}", e)))?;
25
26 let db = Self { conn };
27 db.migrate()?;
28 Ok(db)
29 }
30
31 pub fn open_in_memory() -> Result<Self> {
33 let conn = Connection::open_in_memory().map_err(|e| {
34 GarminError::Database(format!("Failed to open in-memory database: {}", e))
35 })?;
36
37 let db = Self { conn };
38 db.migrate()?;
39 Ok(db)
40 }
41
42 fn migrate(&self) -> Result<()> {
44 self.conn
45 .execute_batch(
46 r#"
47 CREATE TABLE IF NOT EXISTS sync_state (
48 profile_id INTEGER NOT NULL,
49 data_type TEXT NOT NULL,
50 last_sync_date TEXT,
51 last_activity_id INTEGER,
52 PRIMARY KEY (profile_id, data_type)
53 );
54
55 CREATE TABLE IF NOT EXISTS sync_tasks (
56 id INTEGER PRIMARY KEY AUTOINCREMENT,
57 profile_id INTEGER NOT NULL,
58 task_type TEXT NOT NULL,
59 task_data TEXT NOT NULL,
60 pipeline TEXT NOT NULL DEFAULT 'frontier',
61 status TEXT NOT NULL DEFAULT 'pending',
62 attempts INTEGER NOT NULL DEFAULT 0,
63 last_error TEXT,
64 created_at TEXT NOT NULL DEFAULT (datetime('now')),
65 next_retry_at TEXT,
66 completed_at TEXT
67 );
68
69 CREATE INDEX IF NOT EXISTS idx_sync_tasks_status
70 ON sync_tasks(status, next_retry_at);
71
72 CREATE TABLE IF NOT EXISTS profiles (
73 profile_id INTEGER PRIMARY KEY AUTOINCREMENT,
74 display_name TEXT NOT NULL UNIQUE,
75 user_id INTEGER,
76 created_at TEXT NOT NULL DEFAULT (datetime('now')),
77 last_sync_at TEXT
78 );
79
80 -- Backfill tracking table
81 CREATE TABLE IF NOT EXISTS backfill_state (
82 profile_id INTEGER NOT NULL,
83 data_type TEXT NOT NULL,
84 frontier_date TEXT NOT NULL,
85 target_date TEXT NOT NULL,
86 is_complete INTEGER NOT NULL DEFAULT 0,
87 updated_at TEXT NOT NULL DEFAULT (datetime('now')),
88 PRIMARY KEY (profile_id, data_type)
89 );
90 "#,
91 )
92 .map_err(|e| GarminError::Database(format!("Failed to run migrations: {}", e)))?;
93
94 if !self.column_exists("sync_tasks", "pipeline")? {
95 self.conn
96 .execute(
97 "ALTER TABLE sync_tasks ADD COLUMN pipeline TEXT NOT NULL DEFAULT 'frontier'",
98 [],
99 )
100 .map_err(|e| {
101 GarminError::Database(format!(
102 "Failed to add pipeline column to sync_tasks: {}",
103 e
104 ))
105 })?;
106 }
107
108 self.conn
109 .execute(
110 "CREATE INDEX IF NOT EXISTS idx_sync_tasks_pipeline_status
111 ON sync_tasks(pipeline, status, next_retry_at)",
112 [],
113 )
114 .map_err(|e| {
115 GarminError::Database(format!("Failed to create pipeline index: {}", e))
116 })?;
117
118 Ok(())
119 }
120
121 fn column_exists(&self, table: &str, column: &str) -> Result<bool> {
122 let query = format!("PRAGMA table_info({})", table);
123 let mut stmt = self
124 .conn
125 .prepare(&query)
126 .map_err(|e| GarminError::Database(format!("Failed to inspect table: {}", e)))?;
127
128 let rows = stmt
129 .query_map([], |row| row.get::<_, String>(1))
130 .map_err(|e| GarminError::Database(format!("Failed to read table info: {}", e)))?;
131
132 for row in rows {
133 if row.map_err(|e| GarminError::Database(e.to_string()))? == column {
134 return Ok(true);
135 }
136 }
137
138 Ok(false)
139 }
140
141 pub fn get_or_create_profile(&self, display_name: &str) -> Result<i32> {
147 if let Some(id) = self.get_profile_id(display_name)? {
149 return Ok(id);
150 }
151
152 self.conn
154 .execute(
155 "INSERT INTO profiles (display_name) VALUES (?)",
156 params![display_name],
157 )
158 .map_err(|e| GarminError::Database(format!("Failed to create profile: {}", e)))?;
159
160 Ok(self.conn.last_insert_rowid() as i32)
161 }
162
163 pub fn get_profile_id(&self, display_name: &str) -> Result<Option<i32>> {
165 self.conn
166 .query_row(
167 "SELECT profile_id FROM profiles WHERE display_name = ?",
168 params![display_name],
169 |row| row.get(0),
170 )
171 .optional()
172 .map_err(|e| GarminError::Database(format!("Failed to get profile: {}", e)))
173 }
174
175 pub fn get_latest_profile(&self) -> Result<Option<(i32, String)>> {
179 self.conn
180 .query_row(
181 "SELECT profile_id, display_name
182 FROM profiles
183 ORDER BY
184 CASE WHEN last_sync_at IS NULL THEN 1 ELSE 0 END,
185 last_sync_at DESC,
186 created_at DESC,
187 profile_id DESC
188 LIMIT 1",
189 [],
190 |row| Ok((row.get(0)?, row.get(1)?)),
191 )
192 .optional()
193 .map_err(|e| GarminError::Database(format!("Failed to get latest profile: {}", e)))
194 }
195
196 pub fn update_profile_sync_time(&self, profile_id: i32) -> Result<()> {
198 self.conn
199 .execute(
200 "UPDATE profiles SET last_sync_at = datetime('now') WHERE profile_id = ?",
201 params![profile_id],
202 )
203 .map_err(|e| GarminError::Database(format!("Failed to update profile: {}", e)))?;
204
205 Ok(())
206 }
207
208 pub fn get_sync_state(&self, profile_id: i32, data_type: &str) -> Result<Option<SyncState>> {
214 self.conn
215 .query_row(
216 "SELECT profile_id, data_type, last_sync_date, last_activity_id
217 FROM sync_state
218 WHERE profile_id = ? AND data_type = ?",
219 params![profile_id, data_type],
220 |row| {
221 Ok(SyncState {
222 profile_id: row.get(0)?,
223 data_type: row.get(1)?,
224 last_sync_date: row
225 .get::<_, Option<String>>(2)?
226 .and_then(|s| NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
227 last_activity_id: row.get(3)?,
228 })
229 },
230 )
231 .optional()
232 .map_err(|e| GarminError::Database(format!("Failed to get sync state: {}", e)))
233 }
234
235 pub fn update_sync_state(&self, state: &SyncState) -> Result<()> {
237 let date_str = state
238 .last_sync_date
239 .map(|d| d.format("%Y-%m-%d").to_string());
240
241 self.conn
242 .execute(
243 "INSERT INTO sync_state (profile_id, data_type, last_sync_date, last_activity_id)
244 VALUES (?, ?, ?, ?)
245 ON CONFLICT (profile_id, data_type) DO UPDATE SET
246 last_sync_date = excluded.last_sync_date,
247 last_activity_id = excluded.last_activity_id",
248 params![
249 state.profile_id,
250 state.data_type,
251 date_str,
252 state.last_activity_id
253 ],
254 )
255 .map_err(|e| GarminError::Database(format!("Failed to update sync state: {}", e)))?;
256
257 Ok(())
258 }
259
260 pub fn get_backfill_state(
267 &self,
268 profile_id: i32,
269 data_type: &str,
270 ) -> Result<Option<(NaiveDate, NaiveDate, bool)>> {
271 self.conn
272 .query_row(
273 "SELECT frontier_date, target_date, is_complete
274 FROM backfill_state
275 WHERE profile_id = ? AND data_type = ?",
276 params![profile_id, data_type],
277 |row| {
278 let frontier: String = row.get(0)?;
279 let target: String = row.get(1)?;
280 let is_complete: bool = row.get(2)?;
281 Ok((
282 NaiveDate::parse_from_str(&frontier, "%Y-%m-%d").unwrap(),
283 NaiveDate::parse_from_str(&target, "%Y-%m-%d").unwrap(),
284 is_complete,
285 ))
286 },
287 )
288 .optional()
289 .map_err(|e| GarminError::Database(format!("Failed to get backfill state: {}", e)))
290 }
291
292 pub fn set_backfill_state(
294 &self,
295 profile_id: i32,
296 data_type: &str,
297 frontier_date: NaiveDate,
298 target_date: NaiveDate,
299 is_complete: bool,
300 ) -> Result<()> {
301 let frontier_str = frontier_date.format("%Y-%m-%d").to_string();
302 let target_str = target_date.format("%Y-%m-%d").to_string();
303
304 self.conn
305 .execute(
306 "INSERT INTO backfill_state (profile_id, data_type, frontier_date, target_date, is_complete, updated_at)
307 VALUES (?, ?, ?, ?, ?, datetime('now'))
308 ON CONFLICT (profile_id, data_type) DO UPDATE SET
309 frontier_date = excluded.frontier_date,
310 target_date = excluded.target_date,
311 is_complete = excluded.is_complete,
312 updated_at = datetime('now')",
313 params![profile_id, data_type, frontier_str, target_str, is_complete],
314 )
315 .map_err(|e| GarminError::Database(format!("Failed to set backfill state: {}", e)))?;
316
317 Ok(())
318 }
319
320 pub fn update_backfill_frontier(
322 &self,
323 profile_id: i32,
324 data_type: &str,
325 new_frontier: NaiveDate,
326 ) -> Result<()> {
327 let frontier_str = new_frontier.format("%Y-%m-%d").to_string();
328
329 self.conn
330 .execute(
331 "UPDATE backfill_state
332 SET frontier_date = ?, updated_at = datetime('now')
333 WHERE profile_id = ? AND data_type = ?",
334 params![frontier_str, profile_id, data_type],
335 )
336 .map_err(|e| {
337 GarminError::Database(format!("Failed to update backfill frontier: {}", e))
338 })?;
339
340 Ok(())
341 }
342
343 pub fn mark_backfill_complete(&self, profile_id: i32, data_type: &str) -> Result<()> {
345 self.conn
346 .execute(
347 "UPDATE backfill_state
348 SET is_complete = 1, updated_at = datetime('now')
349 WHERE profile_id = ? AND data_type = ?",
350 params![profile_id, data_type],
351 )
352 .map_err(|e| {
353 GarminError::Database(format!("Failed to mark backfill complete: {}", e))
354 })?;
355
356 Ok(())
357 }
358
359 pub fn is_backfill_complete(&self, profile_id: i32, data_type: &str) -> Result<bool> {
361 self.conn
362 .query_row(
363 "SELECT is_complete FROM backfill_state WHERE profile_id = ? AND data_type = ?",
364 params![profile_id, data_type],
365 |row| row.get(0),
366 )
367 .optional()
368 .map(|opt| opt.unwrap_or(false))
369 .map_err(|e| GarminError::Database(format!("Failed to check backfill status: {}", e)))
370 }
371
372 pub fn push_task(&self, task: &SyncTask) -> Result<i64> {
378 let task_data = serde_json::to_string(&task.task_type)
379 .map_err(|e| GarminError::Database(format!("Failed to serialize task: {}", e)))?;
380
381 self.conn
382 .execute(
383 "INSERT INTO sync_tasks (profile_id, task_type, task_data, pipeline, status, attempts, last_error)
384 VALUES (?, ?, ?, ?, ?, ?, ?)",
385 params![
386 task.profile_id,
387 task_type_name(&task.task_type),
388 task_data,
389 pipeline_name(task.pipeline),
390 task.status.to_string(),
391 task.attempts,
392 task.last_error,
393 ],
394 )
395 .map_err(|e| GarminError::Database(format!("Failed to push task: {}", e)))?;
396
397 Ok(self.conn.last_insert_rowid())
398 }
399
400 pub fn pop_task(
402 &self,
403 profile_id: i32,
404 pipeline: Option<SyncPipeline>,
405 ) -> Result<Option<SyncTask>> {
406 let (query, params) = if let Some(pipeline) = pipeline {
407 (
408 "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
409 created_at, next_retry_at, completed_at
410 FROM sync_tasks
411 WHERE profile_id = ?
412 AND pipeline = ?
413 AND status IN ('pending', 'failed')
414 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
415 ORDER BY
416 CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
417 created_at ASC
418 LIMIT 1",
419 params![profile_id, pipeline_name(pipeline)],
420 )
421 } else {
422 (
423 "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
424 created_at, next_retry_at, completed_at
425 FROM sync_tasks
426 WHERE profile_id = ?
427 AND status IN ('pending', 'failed')
428 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
429 ORDER BY
430 CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
431 created_at ASC
432 LIMIT 1",
433 params![profile_id],
434 )
435 };
436
437 self.conn
438 .query_row(query, params, |row| {
439 let task_data: String = row.get(3)?;
440 let task_type: SyncTaskType = serde_json::from_str(&task_data).unwrap();
441 let pipeline_str: String = row.get(4)?;
442 let status_str: String = row.get(5)?;
443
444 Ok(SyncTask {
445 id: Some(row.get(0)?),
446 profile_id: row.get(1)?,
447 task_type,
448 pipeline: parse_pipeline(&pipeline_str),
449 status: parse_status(&status_str),
450 attempts: row.get(6)?,
451 last_error: row.get(7)?,
452 created_at: row
453 .get::<_, Option<String>>(8)?
454 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
455 .map(|dt| dt.with_timezone(&Utc)),
456 next_retry_at: row
457 .get::<_, Option<String>>(9)?
458 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
459 .map(|dt| dt.with_timezone(&Utc)),
460 completed_at: row
461 .get::<_, Option<String>>(10)?
462 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
463 .map(|dt| dt.with_timezone(&Utc)),
464 })
465 })
466 .optional()
467 .map_err(|e| GarminError::Database(format!("Failed to pop task: {}", e)))
468 }
469
470 pub fn pop_task_by_type(
472 &self,
473 profile_id: i32,
474 task_type: &str,
475 pipeline: Option<SyncPipeline>,
476 ) -> Result<Option<SyncTask>> {
477 let (query, params) = if let Some(pipeline) = pipeline {
478 (
479 "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
480 created_at, next_retry_at, completed_at
481 FROM sync_tasks
482 WHERE profile_id = ?
483 AND task_type = ?
484 AND pipeline = ?
485 AND status IN ('pending', 'failed')
486 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
487 ORDER BY
488 CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
489 created_at ASC
490 LIMIT 1",
491 params![profile_id, task_type, pipeline_name(pipeline)],
492 )
493 } else {
494 (
495 "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
496 created_at, next_retry_at, completed_at
497 FROM sync_tasks
498 WHERE profile_id = ?
499 AND task_type = ?
500 AND status IN ('pending', 'failed')
501 AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
502 ORDER BY
503 CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
504 created_at ASC
505 LIMIT 1",
506 params![profile_id, task_type],
507 )
508 };
509
510 self.conn
511 .query_row(query, params, |row| {
512 let task_data: String = row.get(3)?;
513 let task_type: SyncTaskType = serde_json::from_str(&task_data).unwrap();
514 let pipeline_str: String = row.get(4)?;
515 let status_str: String = row.get(5)?;
516
517 Ok(SyncTask {
518 id: Some(row.get(0)?),
519 profile_id: row.get(1)?,
520 task_type,
521 pipeline: parse_pipeline(&pipeline_str),
522 status: parse_status(&status_str),
523 attempts: row.get(6)?,
524 last_error: row.get(7)?,
525 created_at: row
526 .get::<_, Option<String>>(8)?
527 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
528 .map(|dt| dt.with_timezone(&Utc)),
529 next_retry_at: row
530 .get::<_, Option<String>>(9)?
531 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
532 .map(|dt| dt.with_timezone(&Utc)),
533 completed_at: row
534 .get::<_, Option<String>>(10)?
535 .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
536 .map(|dt| dt.with_timezone(&Utc)),
537 })
538 })
539 .optional()
540 .map_err(|e| GarminError::Database(format!("Failed to pop task by type: {}", e)))
541 }
542
543 pub fn mark_task_in_progress(&self, task_id: i64) -> Result<()> {
545 self.conn
546 .execute(
547 "UPDATE sync_tasks SET status = 'in_progress', attempts = attempts + 1 WHERE id = ?",
548 params![task_id],
549 )
550 .map_err(|e| GarminError::Database(format!("Failed to mark task in progress: {}", e)))?;
551
552 Ok(())
553 }
554
555 pub fn mark_task_completed(&self, task_id: i64) -> Result<()> {
557 self.conn
558 .execute(
559 "UPDATE sync_tasks SET status = 'completed', completed_at = datetime('now') WHERE id = ?",
560 params![task_id],
561 )
562 .map_err(|e| GarminError::Database(format!("Failed to mark task completed: {}", e)))?;
563
564 Ok(())
565 }
566
567 pub fn mark_task_failed(&self, task_id: i64, error: &str, retry_delay_secs: i64) -> Result<()> {
569 self.conn
570 .execute(
571 "UPDATE sync_tasks SET
572 status = 'failed',
573 last_error = ?,
574 next_retry_at = datetime('now', '+' || ? || ' seconds')
575 WHERE id = ?",
576 params![error, retry_delay_secs, task_id],
577 )
578 .map_err(|e| GarminError::Database(format!("Failed to mark task failed: {}", e)))?;
579
580 Ok(())
581 }
582
583 pub fn recover_in_progress_tasks(&self) -> Result<u32> {
585 let count = self
586 .conn
587 .execute(
588 "UPDATE sync_tasks SET status = 'pending' WHERE status = 'in_progress'",
589 [],
590 )
591 .map_err(|e| GarminError::Database(format!("Failed to recover tasks: {}", e)))?;
592
593 Ok(count as u32)
594 }
595
596 pub fn count_pending_tasks(
598 &self,
599 profile_id: i32,
600 pipeline: Option<SyncPipeline>,
601 ) -> Result<u32> {
602 let (query, params) = if let Some(pipeline) = pipeline {
603 (
604 "SELECT COUNT(*) FROM sync_tasks
605 WHERE profile_id = ? AND pipeline = ? AND status IN ('pending', 'in_progress', 'failed')",
606 params![profile_id, pipeline_name(pipeline)],
607 )
608 } else {
609 (
610 "SELECT COUNT(*) FROM sync_tasks
611 WHERE profile_id = ? AND status IN ('pending', 'in_progress', 'failed')",
612 params![profile_id],
613 )
614 };
615
616 self.conn
617 .query_row(query, params, |row| row.get(0))
618 .map_err(|e| GarminError::Database(format!("Failed to count tasks: {}", e)))
619 }
620
621 pub fn count_tasks_by_status(&self, profile_id: i32) -> Result<(u32, u32, u32, u32)> {
623 let mut stmt = self
624 .conn
625 .prepare("SELECT status, COUNT(*) FROM sync_tasks WHERE profile_id = ? GROUP BY status")
626 .map_err(|e| GarminError::Database(format!("Failed to prepare query: {}", e)))?;
627
628 let mut pending = 0u32;
629 let mut in_progress = 0u32;
630 let mut completed = 0u32;
631 let mut failed = 0u32;
632
633 let rows = stmt
634 .query_map(params![profile_id], |row| {
635 Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?))
636 })
637 .map_err(|e| GarminError::Database(format!("Failed to query tasks: {}", e)))?;
638
639 for row in rows {
640 let (status, count) = row.map_err(|e| GarminError::Database(e.to_string()))?;
641 match status.as_str() {
642 "pending" => pending = count,
643 "in_progress" => in_progress = count,
644 "completed" => completed = count,
645 "failed" => failed = count,
646 _ => {}
647 }
648 }
649
650 Ok((pending, in_progress, completed, failed))
651 }
652
653 pub fn count_tasks_by_type(
657 &self,
658 profile_id: i32,
659 pipeline: Option<SyncPipeline>,
660 ) -> Result<(u32, u32, u32, u32)> {
661 let (query, params) = if let Some(pipeline) = pipeline {
662 (
663 "SELECT task_type, COUNT(*) FROM sync_tasks
664 WHERE profile_id = ? AND pipeline = ? AND status IN ('pending', 'in_progress', 'failed')
665 GROUP BY task_type",
666 params![profile_id, pipeline_name(pipeline)],
667 )
668 } else {
669 (
670 "SELECT task_type, COUNT(*) FROM sync_tasks
671 WHERE profile_id = ? AND status IN ('pending', 'in_progress', 'failed')
672 GROUP BY task_type",
673 params![profile_id],
674 )
675 };
676
677 let mut stmt = self
678 .conn
679 .prepare(query)
680 .map_err(|e| GarminError::Database(format!("Failed to prepare query: {}", e)))?;
681
682 let mut activities = 0u32;
683 let mut gpx = 0u32;
684 let mut health = 0u32;
685 let mut performance = 0u32;
686
687 let rows = stmt
688 .query_map(params, |row| {
689 Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?))
690 })
691 .map_err(|e| GarminError::Database(format!("Failed to query tasks: {}", e)))?;
692
693 for row in rows {
694 let (task_type, count) = row.map_err(|e| GarminError::Database(e.to_string()))?;
695 match task_type.as_str() {
696 "activities" => activities = count,
697 "download_gpx" => gpx = count,
698 "daily_health" => health = count,
699 "performance" => performance = count,
700 _ => {}
701 }
702 }
703
704 Ok((activities, gpx, health, performance))
705 }
706
707 pub fn cleanup_completed_tasks(&self, max_age_days: i32) -> Result<u32> {
709 let count = self
710 .conn
711 .execute(
712 "DELETE FROM sync_tasks
713 WHERE status = 'completed'
714 AND completed_at < datetime('now', '-' || ? || ' days')",
715 params![max_age_days],
716 )
717 .map_err(|e| GarminError::Database(format!("Failed to cleanup tasks: {}", e)))?;
718
719 Ok(count as u32)
720 }
721
722 pub fn has_health_data(&self, _profile_id: i32, _date: NaiveDate) -> Result<bool> {
724 Ok(false)
727 }
728
729 pub fn has_performance_data(&self, _profile_id: i32, _date: NaiveDate) -> Result<bool> {
731 Ok(false)
734 }
735
736 pub fn reset_failed_tasks(&self) -> Result<u32> {
738 let count = self
739 .conn
740 .execute(
741 "UPDATE sync_tasks SET status = 'pending', next_retry_at = NULL, attempts = 0
742 WHERE status = 'failed'",
743 [],
744 )
745 .map_err(|e| GarminError::Database(format!("Failed to reset tasks: {}", e)))?;
746
747 Ok(count as u32)
748 }
749
750 pub fn clear_pending_tasks(&self) -> Result<u32> {
752 let count = self
753 .conn
754 .execute(
755 "DELETE FROM sync_tasks WHERE status IN ('pending', 'failed')",
756 [],
757 )
758 .map_err(|e| GarminError::Database(format!("Failed to clear tasks: {}", e)))?;
759
760 Ok(count as u32)
761 }
762}
763
764fn task_type_name(task_type: &SyncTaskType) -> &'static str {
765 match task_type {
766 SyncTaskType::Activities { .. } => "activities",
767 SyncTaskType::ActivityDetail { .. } => "activity_detail",
768 SyncTaskType::DownloadGpx { .. } => "download_gpx",
769 SyncTaskType::DailyHealth { .. } => "daily_health",
770 SyncTaskType::Performance { .. } => "performance",
771 SyncTaskType::Weight { .. } => "weight",
772 SyncTaskType::GenerateEmbeddings { .. } => "generate_embeddings",
773 }
774}
775
776fn pipeline_name(pipeline: SyncPipeline) -> &'static str {
777 match pipeline {
778 SyncPipeline::Frontier => "frontier",
779 SyncPipeline::Backfill => "backfill",
780 }
781}
782
783fn parse_pipeline(s: &str) -> SyncPipeline {
784 match s {
785 "backfill" => SyncPipeline::Backfill,
786 _ => SyncPipeline::Frontier,
787 }
788}
789
790fn parse_status(s: &str) -> TaskStatus {
791 match s {
792 "pending" => TaskStatus::Pending,
793 "in_progress" => TaskStatus::InProgress,
794 "completed" => TaskStatus::Completed,
795 "failed" => TaskStatus::Failed,
796 _ => TaskStatus::Pending,
797 }
798}
799
800#[cfg(test)]
801mod tests {
802 use super::*;
803
804 #[test]
805 fn test_profile_management() {
806 let db = SyncDb::open_in_memory().unwrap();
807
808 let id1 = db.get_or_create_profile("test_user").unwrap();
809 let id2 = db.get_or_create_profile("test_user").unwrap();
810 assert_eq!(id1, id2);
811
812 let id3 = db.get_or_create_profile("another_user").unwrap();
813 assert_ne!(id1, id3);
814 }
815
816 #[test]
817 fn test_sync_state() {
818 let db = SyncDb::open_in_memory().unwrap();
819
820 let state = SyncState {
821 profile_id: 1,
822 data_type: "health".to_string(),
823 last_sync_date: Some(NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()),
824 last_activity_id: None,
825 };
826
827 db.update_sync_state(&state).unwrap();
828
829 let retrieved = db.get_sync_state(1, "health").unwrap().unwrap();
830 assert_eq!(retrieved.last_sync_date, state.last_sync_date);
831 }
832
833 #[test]
834 fn test_task_queue() {
835 let db = SyncDb::open_in_memory().unwrap();
836
837 let task = SyncTask::new(
838 1,
839 SyncPipeline::Frontier,
840 SyncTaskType::DailyHealth {
841 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
842 },
843 );
844
845 let id = db.push_task(&task).unwrap();
846 assert!(id > 0);
847
848 let popped = db.pop_task(1, None).unwrap().unwrap();
849 assert_eq!(popped.id, Some(id));
850
851 db.mark_task_in_progress(id).unwrap();
852 db.mark_task_completed(id).unwrap();
853
854 let next = db.pop_task(1, None).unwrap();
856 assert!(next.is_none());
857 }
858
859 #[test]
860 fn test_recover_in_progress() {
861 let db = SyncDb::open_in_memory().unwrap();
862
863 let task = SyncTask::new(
864 1,
865 SyncPipeline::Frontier,
866 SyncTaskType::DailyHealth {
867 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
868 },
869 );
870
871 let id = db.push_task(&task).unwrap();
872 db.mark_task_in_progress(id).unwrap();
873
874 let recovered = db.recover_in_progress_tasks().unwrap();
876 assert_eq!(recovered, 1);
877
878 let popped = db.pop_task(1, None).unwrap();
880 assert!(popped.is_some());
881 }
882
883 #[test]
884 fn test_pop_task_scoped_by_profile() {
885 let db = SyncDb::open_in_memory().unwrap();
886
887 let task_profile_1 = SyncTask::new(
888 1,
889 SyncPipeline::Frontier,
890 SyncTaskType::DailyHealth {
891 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
892 },
893 );
894 let task_profile_2 = SyncTask::new(
895 2,
896 SyncPipeline::Frontier,
897 SyncTaskType::DailyHealth {
898 date: NaiveDate::from_ymd_opt(2024, 12, 16).unwrap(),
899 },
900 );
901
902 let id1 = db.push_task(&task_profile_1).unwrap();
903 let id2 = db.push_task(&task_profile_2).unwrap();
904
905 let popped_profile_2 = db.pop_task(2, None).unwrap().unwrap();
906 assert_eq!(popped_profile_2.id, Some(id2));
907
908 let popped_profile_1 = db.pop_task(1, None).unwrap().unwrap();
909 assert_eq!(popped_profile_1.id, Some(id1));
910 }
911
912 #[test]
913 fn test_pop_task_by_type() {
914 let db = SyncDb::open_in_memory().unwrap();
915
916 let task_health = SyncTask::new(
917 1,
918 SyncPipeline::Frontier,
919 SyncTaskType::DailyHealth {
920 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
921 },
922 );
923 let task_perf = SyncTask::new(
924 1,
925 SyncPipeline::Frontier,
926 SyncTaskType::Performance {
927 date: NaiveDate::from_ymd_opt(2024, 12, 22).unwrap(),
928 },
929 );
930
931 let id_health = db.push_task(&task_health).unwrap();
932 let id_perf = db.push_task(&task_perf).unwrap();
933
934 let popped_perf = db
935 .pop_task_by_type(1, "performance", None)
936 .unwrap()
937 .unwrap();
938 assert_eq!(popped_perf.id, Some(id_perf));
939
940 let popped_health = db
941 .pop_task_by_type(1, "daily_health", None)
942 .unwrap()
943 .unwrap();
944 assert_eq!(popped_health.id, Some(id_health));
945 }
946
947 #[test]
948 fn test_pop_task_by_pipeline() {
949 let db = SyncDb::open_in_memory().unwrap();
950
951 let task_frontier = SyncTask::new(
952 1,
953 SyncPipeline::Frontier,
954 SyncTaskType::DailyHealth {
955 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
956 },
957 );
958 let task_backfill = SyncTask::new(
959 1,
960 SyncPipeline::Backfill,
961 SyncTaskType::DailyHealth {
962 date: NaiveDate::from_ymd_opt(2024, 12, 16).unwrap(),
963 },
964 );
965
966 let id_frontier = db.push_task(&task_frontier).unwrap();
967 let id_backfill = db.push_task(&task_backfill).unwrap();
968
969 let popped_backfill = db
970 .pop_task(1, Some(SyncPipeline::Backfill))
971 .unwrap()
972 .unwrap();
973 assert_eq!(popped_backfill.id, Some(id_backfill));
974
975 let popped_frontier = db
976 .pop_task(1, Some(SyncPipeline::Frontier))
977 .unwrap()
978 .unwrap();
979 assert_eq!(popped_frontier.id, Some(id_frontier));
980 }
981
982 #[test]
983 fn test_update_backfill_frontier() {
984 let db = SyncDb::open_in_memory().unwrap();
985
986 let frontier = NaiveDate::from_ymd_opt(2025, 1, 31).unwrap();
987 let target = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
988 db.set_backfill_state(1, "activities", frontier, target, false)
989 .unwrap();
990
991 let new_frontier = NaiveDate::from_ymd_opt(2025, 1, 15).unwrap();
992 db.update_backfill_frontier(1, "activities", new_frontier)
993 .unwrap();
994
995 let state = db.get_backfill_state(1, "activities").unwrap().unwrap();
996 assert_eq!(state.0, new_frontier);
997 assert_eq!(state.1, target);
998 assert!(!state.2);
999 }
1000
1001 #[test]
1002 fn test_count_tasks_by_type_includes_failed() {
1003 let db = SyncDb::open_in_memory().unwrap();
1004
1005 let task = SyncTask::new(
1006 1,
1007 SyncPipeline::Frontier,
1008 SyncTaskType::DailyHealth {
1009 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1010 },
1011 );
1012 let id = db.push_task(&task).unwrap();
1013
1014 db.mark_task_in_progress(id).unwrap();
1015 db.mark_task_failed(id, "boom", 60).unwrap();
1016
1017 let (_activities, _gpx, health, _perf) = db.count_tasks_by_type(1, None).unwrap();
1018 assert_eq!(health, 1);
1019 }
1020}