Skip to main content

garmin_cli/storage/
sync_db.rs

1//! SQLite-based sync state and task queue storage
2//!
3//! This module handles the operational data for sync:
4//! - sync_state: tracks incremental sync progress
5//! - sync_tasks: persistent task queue for crash recovery
6
7use std::path::Path;
8
9use chrono::{DateTime, NaiveDate, Utc};
10use rusqlite::{params, Connection, OptionalExtension};
11
12use crate::db::models::{SyncPipeline, SyncState, SyncTask, SyncTaskType, TaskStatus};
13use crate::error::{GarminError, Result};
14
15/// SQLite database for sync state and task queue
16pub struct SyncDb {
17    conn: Connection,
18}
19
20impl SyncDb {
21    /// Open or create the sync database
22    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
23        let conn = Connection::open(path.as_ref())
24            .map_err(|e| GarminError::Database(format!("Failed to open sync database: {}", e)))?;
25
26        let db = Self { conn };
27        db.migrate()?;
28        Ok(db)
29    }
30
31    /// Open an in-memory database (for testing)
32    pub fn open_in_memory() -> Result<Self> {
33        let conn = Connection::open_in_memory().map_err(|e| {
34            GarminError::Database(format!("Failed to open in-memory database: {}", e))
35        })?;
36
37        let db = Self { conn };
38        db.migrate()?;
39        Ok(db)
40    }
41
42    /// Run migrations
43    fn migrate(&self) -> Result<()> {
44        self.conn
45            .execute_batch(
46                r#"
47                CREATE TABLE IF NOT EXISTS sync_state (
48                    profile_id INTEGER NOT NULL,
49                    data_type TEXT NOT NULL,
50                    last_sync_date TEXT,
51                    last_activity_id INTEGER,
52                    PRIMARY KEY (profile_id, data_type)
53                );
54
55                CREATE TABLE IF NOT EXISTS sync_tasks (
56                    id INTEGER PRIMARY KEY AUTOINCREMENT,
57                    profile_id INTEGER NOT NULL,
58                    task_type TEXT NOT NULL,
59                    task_data TEXT NOT NULL,
60                    pipeline TEXT NOT NULL DEFAULT 'frontier',
61                    status TEXT NOT NULL DEFAULT 'pending',
62                    attempts INTEGER NOT NULL DEFAULT 0,
63                    last_error TEXT,
64                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
65                    next_retry_at TEXT,
66                    completed_at TEXT
67                );
68
69                CREATE INDEX IF NOT EXISTS idx_sync_tasks_status
70                ON sync_tasks(status, next_retry_at);
71
72                CREATE TABLE IF NOT EXISTS profiles (
73                    profile_id INTEGER PRIMARY KEY AUTOINCREMENT,
74                    display_name TEXT NOT NULL UNIQUE,
75                    user_id INTEGER,
76                    created_at TEXT NOT NULL DEFAULT (datetime('now')),
77                    last_sync_at TEXT
78                );
79
80                -- Backfill tracking table
81                CREATE TABLE IF NOT EXISTS backfill_state (
82                    profile_id INTEGER NOT NULL,
83                    data_type TEXT NOT NULL,
84                    frontier_date TEXT NOT NULL,
85                    target_date TEXT NOT NULL,
86                    is_complete INTEGER NOT NULL DEFAULT 0,
87                    updated_at TEXT NOT NULL DEFAULT (datetime('now')),
88                    PRIMARY KEY (profile_id, data_type)
89                );
90                "#,
91            )
92            .map_err(|e| GarminError::Database(format!("Failed to run migrations: {}", e)))?;
93
94        if !self.column_exists("sync_tasks", "pipeline")? {
95            self.conn
96                .execute(
97                    "ALTER TABLE sync_tasks ADD COLUMN pipeline TEXT NOT NULL DEFAULT 'frontier'",
98                    [],
99                )
100                .map_err(|e| {
101                    GarminError::Database(format!(
102                        "Failed to add pipeline column to sync_tasks: {}",
103                        e
104                    ))
105                })?;
106        }
107
108        self.conn
109            .execute(
110                "CREATE INDEX IF NOT EXISTS idx_sync_tasks_pipeline_status
111                 ON sync_tasks(pipeline, status, next_retry_at)",
112                [],
113            )
114            .map_err(|e| {
115                GarminError::Database(format!("Failed to create pipeline index: {}", e))
116            })?;
117
118        Ok(())
119    }
120
121    fn column_exists(&self, table: &str, column: &str) -> Result<bool> {
122        let query = format!("PRAGMA table_info({})", table);
123        let mut stmt = self
124            .conn
125            .prepare(&query)
126            .map_err(|e| GarminError::Database(format!("Failed to inspect table: {}", e)))?;
127
128        let rows = stmt
129            .query_map([], |row| row.get::<_, String>(1))
130            .map_err(|e| GarminError::Database(format!("Failed to read table info: {}", e)))?;
131
132        for row in rows {
133            if row.map_err(|e| GarminError::Database(e.to_string()))? == column {
134                return Ok(true);
135            }
136        }
137
138        Ok(false)
139    }
140
141    // =========================================================================
142    // Profile Management
143    // =========================================================================
144
145    /// Get or create a profile by display name
146    pub fn get_or_create_profile(&self, display_name: &str) -> Result<i32> {
147        // Try to get existing
148        if let Some(id) = self.get_profile_id(display_name)? {
149            return Ok(id);
150        }
151
152        // Create new
153        self.conn
154            .execute(
155                "INSERT INTO profiles (display_name) VALUES (?)",
156                params![display_name],
157            )
158            .map_err(|e| GarminError::Database(format!("Failed to create profile: {}", e)))?;
159
160        Ok(self.conn.last_insert_rowid() as i32)
161    }
162
163    /// Get profile ID by display name
164    pub fn get_profile_id(&self, display_name: &str) -> Result<Option<i32>> {
165        self.conn
166            .query_row(
167                "SELECT profile_id FROM profiles WHERE display_name = ?",
168                params![display_name],
169                |row| row.get(0),
170            )
171            .optional()
172            .map_err(|e| GarminError::Database(format!("Failed to get profile: {}", e)))
173    }
174
175    /// Get the most recently synced profile, falling back to most recently created.
176    ///
177    /// Returns (profile_id, display_name) if at least one profile exists.
178    pub fn get_latest_profile(&self) -> Result<Option<(i32, String)>> {
179        self.conn
180            .query_row(
181                "SELECT profile_id, display_name
182                 FROM profiles
183                 ORDER BY
184                     CASE WHEN last_sync_at IS NULL THEN 1 ELSE 0 END,
185                     last_sync_at DESC,
186                     created_at DESC,
187                     profile_id DESC
188                 LIMIT 1",
189                [],
190                |row| Ok((row.get(0)?, row.get(1)?)),
191            )
192            .optional()
193            .map_err(|e| GarminError::Database(format!("Failed to get latest profile: {}", e)))
194    }
195
196    /// Update profile's last sync time
197    pub fn update_profile_sync_time(&self, profile_id: i32) -> Result<()> {
198        self.conn
199            .execute(
200                "UPDATE profiles SET last_sync_at = datetime('now') WHERE profile_id = ?",
201                params![profile_id],
202            )
203            .map_err(|e| GarminError::Database(format!("Failed to update profile: {}", e)))?;
204
205        Ok(())
206    }
207
208    // =========================================================================
209    // Sync State
210    // =========================================================================
211
212    /// Get sync state for a profile and data type
213    pub fn get_sync_state(&self, profile_id: i32, data_type: &str) -> Result<Option<SyncState>> {
214        self.conn
215            .query_row(
216                "SELECT profile_id, data_type, last_sync_date, last_activity_id
217                 FROM sync_state
218                 WHERE profile_id = ? AND data_type = ?",
219                params![profile_id, data_type],
220                |row| {
221                    Ok(SyncState {
222                        profile_id: row.get(0)?,
223                        data_type: row.get(1)?,
224                        last_sync_date: row
225                            .get::<_, Option<String>>(2)?
226                            .and_then(|s| NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
227                        last_activity_id: row.get(3)?,
228                    })
229                },
230            )
231            .optional()
232            .map_err(|e| GarminError::Database(format!("Failed to get sync state: {}", e)))
233    }
234
235    /// Update sync state
236    pub fn update_sync_state(&self, state: &SyncState) -> Result<()> {
237        let date_str = state
238            .last_sync_date
239            .map(|d| d.format("%Y-%m-%d").to_string());
240
241        self.conn
242            .execute(
243                "INSERT INTO sync_state (profile_id, data_type, last_sync_date, last_activity_id)
244                 VALUES (?, ?, ?, ?)
245                 ON CONFLICT (profile_id, data_type) DO UPDATE SET
246                     last_sync_date = excluded.last_sync_date,
247                     last_activity_id = excluded.last_activity_id",
248                params![
249                    state.profile_id,
250                    state.data_type,
251                    date_str,
252                    state.last_activity_id
253                ],
254            )
255            .map_err(|e| GarminError::Database(format!("Failed to update sync state: {}", e)))?;
256
257        Ok(())
258    }
259
260    // =========================================================================
261    // Backfill State
262    // =========================================================================
263
264    /// Get backfill state for a profile and data type
265    /// Returns (frontier_date, target_date, is_complete)
266    pub fn get_backfill_state(
267        &self,
268        profile_id: i32,
269        data_type: &str,
270    ) -> Result<Option<(NaiveDate, NaiveDate, bool)>> {
271        self.conn
272            .query_row(
273                "SELECT frontier_date, target_date, is_complete
274                 FROM backfill_state
275                 WHERE profile_id = ? AND data_type = ?",
276                params![profile_id, data_type],
277                |row| {
278                    let frontier: String = row.get(0)?;
279                    let target: String = row.get(1)?;
280                    let is_complete: bool = row.get(2)?;
281                    Ok((
282                        NaiveDate::parse_from_str(&frontier, "%Y-%m-%d").unwrap(),
283                        NaiveDate::parse_from_str(&target, "%Y-%m-%d").unwrap(),
284                        is_complete,
285                    ))
286                },
287            )
288            .optional()
289            .map_err(|e| GarminError::Database(format!("Failed to get backfill state: {}", e)))
290    }
291
292    /// Initialize or update backfill state
293    pub fn set_backfill_state(
294        &self,
295        profile_id: i32,
296        data_type: &str,
297        frontier_date: NaiveDate,
298        target_date: NaiveDate,
299        is_complete: bool,
300    ) -> Result<()> {
301        let frontier_str = frontier_date.format("%Y-%m-%d").to_string();
302        let target_str = target_date.format("%Y-%m-%d").to_string();
303
304        self.conn
305            .execute(
306                "INSERT INTO backfill_state (profile_id, data_type, frontier_date, target_date, is_complete, updated_at)
307                 VALUES (?, ?, ?, ?, ?, datetime('now'))
308                 ON CONFLICT (profile_id, data_type) DO UPDATE SET
309                     frontier_date = excluded.frontier_date,
310                     target_date = excluded.target_date,
311                     is_complete = excluded.is_complete,
312                     updated_at = datetime('now')",
313                params![profile_id, data_type, frontier_str, target_str, is_complete],
314            )
315            .map_err(|e| GarminError::Database(format!("Failed to set backfill state: {}", e)))?;
316
317        Ok(())
318    }
319
320    /// Update backfill frontier (moves the frontier date backward as we sync older data)
321    pub fn update_backfill_frontier(
322        &self,
323        profile_id: i32,
324        data_type: &str,
325        new_frontier: NaiveDate,
326    ) -> Result<()> {
327        let frontier_str = new_frontier.format("%Y-%m-%d").to_string();
328
329        self.conn
330            .execute(
331                "UPDATE backfill_state
332                 SET frontier_date = ?, updated_at = datetime('now')
333                 WHERE profile_id = ? AND data_type = ?",
334                params![frontier_str, profile_id, data_type],
335            )
336            .map_err(|e| {
337                GarminError::Database(format!("Failed to update backfill frontier: {}", e))
338            })?;
339
340        Ok(())
341    }
342
343    /// Mark backfill as complete
344    pub fn mark_backfill_complete(&self, profile_id: i32, data_type: &str) -> Result<()> {
345        self.conn
346            .execute(
347                "UPDATE backfill_state
348                 SET is_complete = 1, updated_at = datetime('now')
349                 WHERE profile_id = ? AND data_type = ?",
350                params![profile_id, data_type],
351            )
352            .map_err(|e| {
353                GarminError::Database(format!("Failed to mark backfill complete: {}", e))
354            })?;
355
356        Ok(())
357    }
358
359    /// Check if backfill is complete for a data type
360    pub fn is_backfill_complete(&self, profile_id: i32, data_type: &str) -> Result<bool> {
361        self.conn
362            .query_row(
363                "SELECT is_complete FROM backfill_state WHERE profile_id = ? AND data_type = ?",
364                params![profile_id, data_type],
365                |row| row.get(0),
366            )
367            .optional()
368            .map(|opt| opt.unwrap_or(false))
369            .map_err(|e| GarminError::Database(format!("Failed to check backfill status: {}", e)))
370    }
371
372    // =========================================================================
373    // Task Queue
374    // =========================================================================
375
376    /// Push a task to the queue
377    pub fn push_task(&self, task: &SyncTask) -> Result<i64> {
378        let task_data = serde_json::to_string(&task.task_type)
379            .map_err(|e| GarminError::Database(format!("Failed to serialize task: {}", e)))?;
380
381        self.conn
382            .execute(
383                "INSERT INTO sync_tasks (profile_id, task_type, task_data, pipeline, status, attempts, last_error)
384                 VALUES (?, ?, ?, ?, ?, ?, ?)",
385                params![
386                    task.profile_id,
387                    task_type_name(&task.task_type),
388                    task_data,
389                    pipeline_name(task.pipeline),
390                    task.status.to_string(),
391                    task.attempts,
392                    task.last_error,
393                ],
394            )
395            .map_err(|e| GarminError::Database(format!("Failed to push task: {}", e)))?;
396
397        Ok(self.conn.last_insert_rowid())
398    }
399
400    /// Pop the next task from the queue for a profile
401    pub fn pop_task(
402        &self,
403        profile_id: i32,
404        pipeline: Option<SyncPipeline>,
405    ) -> Result<Option<SyncTask>> {
406        let (query, params) = if let Some(pipeline) = pipeline {
407            (
408                "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
409                        created_at, next_retry_at, completed_at
410                 FROM sync_tasks
411                 WHERE profile_id = ?
412                   AND pipeline = ?
413                   AND status IN ('pending', 'failed')
414                   AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
415                 ORDER BY
416                     CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
417                     created_at ASC
418                 LIMIT 1",
419                params![profile_id, pipeline_name(pipeline)],
420            )
421        } else {
422            (
423                "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
424                        created_at, next_retry_at, completed_at
425                 FROM sync_tasks
426                 WHERE profile_id = ?
427                   AND status IN ('pending', 'failed')
428                   AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
429                 ORDER BY
430                     CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
431                     created_at ASC
432                 LIMIT 1",
433                params![profile_id],
434            )
435        };
436
437        self.conn
438            .query_row(query, params, |row| {
439                let task_data: String = row.get(3)?;
440                let task_type: SyncTaskType = serde_json::from_str(&task_data).unwrap();
441                let pipeline_str: String = row.get(4)?;
442                let status_str: String = row.get(5)?;
443
444                Ok(SyncTask {
445                    id: Some(row.get(0)?),
446                    profile_id: row.get(1)?,
447                    task_type,
448                    pipeline: parse_pipeline(&pipeline_str),
449                    status: parse_status(&status_str),
450                    attempts: row.get(6)?,
451                    last_error: row.get(7)?,
452                    created_at: row
453                        .get::<_, Option<String>>(8)?
454                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
455                        .map(|dt| dt.with_timezone(&Utc)),
456                    next_retry_at: row
457                        .get::<_, Option<String>>(9)?
458                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
459                        .map(|dt| dt.with_timezone(&Utc)),
460                    completed_at: row
461                        .get::<_, Option<String>>(10)?
462                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
463                        .map(|dt| dt.with_timezone(&Utc)),
464                })
465            })
466            .optional()
467            .map_err(|e| GarminError::Database(format!("Failed to pop task: {}", e)))
468    }
469
470    /// Pop the next task for a profile and task type
471    pub fn pop_task_by_type(
472        &self,
473        profile_id: i32,
474        task_type: &str,
475        pipeline: Option<SyncPipeline>,
476    ) -> Result<Option<SyncTask>> {
477        let (query, params) = if let Some(pipeline) = pipeline {
478            (
479                "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
480                        created_at, next_retry_at, completed_at
481                 FROM sync_tasks
482                 WHERE profile_id = ?
483                   AND task_type = ?
484                   AND pipeline = ?
485                   AND status IN ('pending', 'failed')
486                   AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
487                 ORDER BY
488                     CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
489                     created_at ASC
490                 LIMIT 1",
491                params![profile_id, task_type, pipeline_name(pipeline)],
492            )
493        } else {
494            (
495                "SELECT id, profile_id, task_type, task_data, pipeline, status, attempts, last_error,
496                        created_at, next_retry_at, completed_at
497                 FROM sync_tasks
498                 WHERE profile_id = ?
499                   AND task_type = ?
500                   AND status IN ('pending', 'failed')
501                   AND (next_retry_at IS NULL OR next_retry_at <= datetime('now'))
502                 ORDER BY
503                     CASE WHEN status = 'failed' THEN 0 ELSE 1 END,
504                     created_at ASC
505                 LIMIT 1",
506                params![profile_id, task_type],
507            )
508        };
509
510        self.conn
511            .query_row(query, params, |row| {
512                let task_data: String = row.get(3)?;
513                let task_type: SyncTaskType = serde_json::from_str(&task_data).unwrap();
514                let pipeline_str: String = row.get(4)?;
515                let status_str: String = row.get(5)?;
516
517                Ok(SyncTask {
518                    id: Some(row.get(0)?),
519                    profile_id: row.get(1)?,
520                    task_type,
521                    pipeline: parse_pipeline(&pipeline_str),
522                    status: parse_status(&status_str),
523                    attempts: row.get(6)?,
524                    last_error: row.get(7)?,
525                    created_at: row
526                        .get::<_, Option<String>>(8)?
527                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
528                        .map(|dt| dt.with_timezone(&Utc)),
529                    next_retry_at: row
530                        .get::<_, Option<String>>(9)?
531                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
532                        .map(|dt| dt.with_timezone(&Utc)),
533                    completed_at: row
534                        .get::<_, Option<String>>(10)?
535                        .and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
536                        .map(|dt| dt.with_timezone(&Utc)),
537                })
538            })
539            .optional()
540            .map_err(|e| GarminError::Database(format!("Failed to pop task by type: {}", e)))
541    }
542
543    /// Mark a task as in progress
544    pub fn mark_task_in_progress(&self, task_id: i64) -> Result<()> {
545        self.conn
546            .execute(
547                "UPDATE sync_tasks SET status = 'in_progress', attempts = attempts + 1 WHERE id = ?",
548                params![task_id],
549            )
550            .map_err(|e| GarminError::Database(format!("Failed to mark task in progress: {}", e)))?;
551
552        Ok(())
553    }
554
555    /// Mark a task as completed
556    pub fn mark_task_completed(&self, task_id: i64) -> Result<()> {
557        self.conn
558            .execute(
559                "UPDATE sync_tasks SET status = 'completed', completed_at = datetime('now') WHERE id = ?",
560                params![task_id],
561            )
562            .map_err(|e| GarminError::Database(format!("Failed to mark task completed: {}", e)))?;
563
564        Ok(())
565    }
566
567    /// Mark a task as failed
568    pub fn mark_task_failed(&self, task_id: i64, error: &str, retry_delay_secs: i64) -> Result<()> {
569        self.conn
570            .execute(
571                "UPDATE sync_tasks SET
572                     status = 'failed',
573                     last_error = ?,
574                     next_retry_at = datetime('now', '+' || ? || ' seconds')
575                 WHERE id = ?",
576                params![error, retry_delay_secs, task_id],
577            )
578            .map_err(|e| GarminError::Database(format!("Failed to mark task failed: {}", e)))?;
579
580        Ok(())
581    }
582
583    /// Recover in-progress tasks (after crash)
584    pub fn recover_in_progress_tasks(&self) -> Result<u32> {
585        let count = self
586            .conn
587            .execute(
588                "UPDATE sync_tasks SET status = 'pending' WHERE status = 'in_progress'",
589                [],
590            )
591            .map_err(|e| GarminError::Database(format!("Failed to recover tasks: {}", e)))?;
592
593        Ok(count as u32)
594    }
595
596    /// Count pending tasks
597    pub fn count_pending_tasks(
598        &self,
599        profile_id: i32,
600        pipeline: Option<SyncPipeline>,
601    ) -> Result<u32> {
602        let (query, params) = if let Some(pipeline) = pipeline {
603            (
604                "SELECT COUNT(*) FROM sync_tasks
605                 WHERE profile_id = ? AND pipeline = ? AND status IN ('pending', 'in_progress', 'failed')",
606                params![profile_id, pipeline_name(pipeline)],
607            )
608        } else {
609            (
610                "SELECT COUNT(*) FROM sync_tasks
611                 WHERE profile_id = ? AND status IN ('pending', 'in_progress', 'failed')",
612                params![profile_id],
613            )
614        };
615
616        self.conn
617            .query_row(query, params, |row| row.get(0))
618            .map_err(|e| GarminError::Database(format!("Failed to count tasks: {}", e)))
619    }
620
621    /// Count tasks by status
622    pub fn count_tasks_by_status(&self, profile_id: i32) -> Result<(u32, u32, u32, u32)> {
623        let mut stmt = self
624            .conn
625            .prepare("SELECT status, COUNT(*) FROM sync_tasks WHERE profile_id = ? GROUP BY status")
626            .map_err(|e| GarminError::Database(format!("Failed to prepare query: {}", e)))?;
627
628        let mut pending = 0u32;
629        let mut in_progress = 0u32;
630        let mut completed = 0u32;
631        let mut failed = 0u32;
632
633        let rows = stmt
634            .query_map(params![profile_id], |row| {
635                Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?))
636            })
637            .map_err(|e| GarminError::Database(format!("Failed to query tasks: {}", e)))?;
638
639        for row in rows {
640            let (status, count) = row.map_err(|e| GarminError::Database(e.to_string()))?;
641            match status.as_str() {
642                "pending" => pending = count,
643                "in_progress" => in_progress = count,
644                "completed" => completed = count,
645                "failed" => failed = count,
646                _ => {}
647            }
648        }
649
650        Ok((pending, in_progress, completed, failed))
651    }
652
653    /// Count pending/in_progress/failed tasks by type
654    ///
655    /// Returns (activities, gpx, health, performance) counts
656    pub fn count_tasks_by_type(
657        &self,
658        profile_id: i32,
659        pipeline: Option<SyncPipeline>,
660    ) -> Result<(u32, u32, u32, u32)> {
661        let (query, params) = if let Some(pipeline) = pipeline {
662            (
663                "SELECT task_type, COUNT(*) FROM sync_tasks
664                 WHERE profile_id = ? AND pipeline = ? AND status IN ('pending', 'in_progress', 'failed')
665                 GROUP BY task_type",
666                params![profile_id, pipeline_name(pipeline)],
667            )
668        } else {
669            (
670                "SELECT task_type, COUNT(*) FROM sync_tasks
671                 WHERE profile_id = ? AND status IN ('pending', 'in_progress', 'failed')
672                 GROUP BY task_type",
673                params![profile_id],
674            )
675        };
676
677        let mut stmt = self
678            .conn
679            .prepare(query)
680            .map_err(|e| GarminError::Database(format!("Failed to prepare query: {}", e)))?;
681
682        let mut activities = 0u32;
683        let mut gpx = 0u32;
684        let mut health = 0u32;
685        let mut performance = 0u32;
686
687        let rows = stmt
688            .query_map(params, |row| {
689                Ok((row.get::<_, String>(0)?, row.get::<_, u32>(1)?))
690            })
691            .map_err(|e| GarminError::Database(format!("Failed to query tasks: {}", e)))?;
692
693        for row in rows {
694            let (task_type, count) = row.map_err(|e| GarminError::Database(e.to_string()))?;
695            match task_type.as_str() {
696                "activities" => activities = count,
697                "download_gpx" => gpx = count,
698                "daily_health" => health = count,
699                "performance" => performance = count,
700                _ => {}
701            }
702        }
703
704        Ok((activities, gpx, health, performance))
705    }
706
707    /// Clean up old completed tasks
708    pub fn cleanup_completed_tasks(&self, max_age_days: i32) -> Result<u32> {
709        let count = self
710            .conn
711            .execute(
712                "DELETE FROM sync_tasks
713                 WHERE status = 'completed'
714                   AND completed_at < datetime('now', '-' || ? || ' days')",
715                params![max_age_days],
716            )
717            .map_err(|e| GarminError::Database(format!("Failed to cleanup tasks: {}", e)))?;
718
719        Ok(count as u32)
720    }
721
722    /// Check if health data exists for a date
723    pub fn has_health_data(&self, _profile_id: i32, _date: NaiveDate) -> Result<bool> {
724        // For now, always return false since we're not tracking this in SQLite
725        // The actual data check will be done against Parquet files
726        Ok(false)
727    }
728
729    /// Check if performance data exists for a date
730    pub fn has_performance_data(&self, _profile_id: i32, _date: NaiveDate) -> Result<bool> {
731        // For now, always return false since we're not tracking this in SQLite
732        // The actual data check will be done against Parquet files
733        Ok(false)
734    }
735
736    /// Reset all failed tasks to pending (clear retry delays)
737    pub fn reset_failed_tasks(&self) -> Result<u32> {
738        let count = self
739            .conn
740            .execute(
741                "UPDATE sync_tasks SET status = 'pending', next_retry_at = NULL, attempts = 0
742                 WHERE status = 'failed'",
743                [],
744            )
745            .map_err(|e| GarminError::Database(format!("Failed to reset tasks: {}", e)))?;
746
747        Ok(count as u32)
748    }
749
750    /// Clear all pending and failed tasks
751    pub fn clear_pending_tasks(&self) -> Result<u32> {
752        let count = self
753            .conn
754            .execute(
755                "DELETE FROM sync_tasks WHERE status IN ('pending', 'failed')",
756                [],
757            )
758            .map_err(|e| GarminError::Database(format!("Failed to clear tasks: {}", e)))?;
759
760        Ok(count as u32)
761    }
762}
763
764fn task_type_name(task_type: &SyncTaskType) -> &'static str {
765    match task_type {
766        SyncTaskType::Activities { .. } => "activities",
767        SyncTaskType::ActivityDetail { .. } => "activity_detail",
768        SyncTaskType::DownloadGpx { .. } => "download_gpx",
769        SyncTaskType::DailyHealth { .. } => "daily_health",
770        SyncTaskType::Performance { .. } => "performance",
771        SyncTaskType::Weight { .. } => "weight",
772        SyncTaskType::GenerateEmbeddings { .. } => "generate_embeddings",
773    }
774}
775
776fn pipeline_name(pipeline: SyncPipeline) -> &'static str {
777    match pipeline {
778        SyncPipeline::Frontier => "frontier",
779        SyncPipeline::Backfill => "backfill",
780    }
781}
782
783fn parse_pipeline(s: &str) -> SyncPipeline {
784    match s {
785        "backfill" => SyncPipeline::Backfill,
786        _ => SyncPipeline::Frontier,
787    }
788}
789
790fn parse_status(s: &str) -> TaskStatus {
791    match s {
792        "pending" => TaskStatus::Pending,
793        "in_progress" => TaskStatus::InProgress,
794        "completed" => TaskStatus::Completed,
795        "failed" => TaskStatus::Failed,
796        _ => TaskStatus::Pending,
797    }
798}
799
800#[cfg(test)]
801mod tests {
802    use super::*;
803
804    #[test]
805    fn test_profile_management() {
806        let db = SyncDb::open_in_memory().unwrap();
807
808        let id1 = db.get_or_create_profile("test_user").unwrap();
809        let id2 = db.get_or_create_profile("test_user").unwrap();
810        assert_eq!(id1, id2);
811
812        let id3 = db.get_or_create_profile("another_user").unwrap();
813        assert_ne!(id1, id3);
814    }
815
816    #[test]
817    fn test_sync_state() {
818        let db = SyncDb::open_in_memory().unwrap();
819
820        let state = SyncState {
821            profile_id: 1,
822            data_type: "health".to_string(),
823            last_sync_date: Some(NaiveDate::from_ymd_opt(2024, 12, 15).unwrap()),
824            last_activity_id: None,
825        };
826
827        db.update_sync_state(&state).unwrap();
828
829        let retrieved = db.get_sync_state(1, "health").unwrap().unwrap();
830        assert_eq!(retrieved.last_sync_date, state.last_sync_date);
831    }
832
833    #[test]
834    fn test_task_queue() {
835        let db = SyncDb::open_in_memory().unwrap();
836
837        let task = SyncTask::new(
838            1,
839            SyncPipeline::Frontier,
840            SyncTaskType::DailyHealth {
841                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
842            },
843        );
844
845        let id = db.push_task(&task).unwrap();
846        assert!(id > 0);
847
848        let popped = db.pop_task(1, None).unwrap().unwrap();
849        assert_eq!(popped.id, Some(id));
850
851        db.mark_task_in_progress(id).unwrap();
852        db.mark_task_completed(id).unwrap();
853
854        // Should be no more pending tasks
855        let next = db.pop_task(1, None).unwrap();
856        assert!(next.is_none());
857    }
858
859    #[test]
860    fn test_recover_in_progress() {
861        let db = SyncDb::open_in_memory().unwrap();
862
863        let task = SyncTask::new(
864            1,
865            SyncPipeline::Frontier,
866            SyncTaskType::DailyHealth {
867                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
868            },
869        );
870
871        let id = db.push_task(&task).unwrap();
872        db.mark_task_in_progress(id).unwrap();
873
874        // Simulate crash recovery
875        let recovered = db.recover_in_progress_tasks().unwrap();
876        assert_eq!(recovered, 1);
877
878        // Task should be poppable again
879        let popped = db.pop_task(1, None).unwrap();
880        assert!(popped.is_some());
881    }
882
883    #[test]
884    fn test_pop_task_scoped_by_profile() {
885        let db = SyncDb::open_in_memory().unwrap();
886
887        let task_profile_1 = SyncTask::new(
888            1,
889            SyncPipeline::Frontier,
890            SyncTaskType::DailyHealth {
891                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
892            },
893        );
894        let task_profile_2 = SyncTask::new(
895            2,
896            SyncPipeline::Frontier,
897            SyncTaskType::DailyHealth {
898                date: NaiveDate::from_ymd_opt(2024, 12, 16).unwrap(),
899            },
900        );
901
902        let id1 = db.push_task(&task_profile_1).unwrap();
903        let id2 = db.push_task(&task_profile_2).unwrap();
904
905        let popped_profile_2 = db.pop_task(2, None).unwrap().unwrap();
906        assert_eq!(popped_profile_2.id, Some(id2));
907
908        let popped_profile_1 = db.pop_task(1, None).unwrap().unwrap();
909        assert_eq!(popped_profile_1.id, Some(id1));
910    }
911
912    #[test]
913    fn test_pop_task_by_type() {
914        let db = SyncDb::open_in_memory().unwrap();
915
916        let task_health = SyncTask::new(
917            1,
918            SyncPipeline::Frontier,
919            SyncTaskType::DailyHealth {
920                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
921            },
922        );
923        let task_perf = SyncTask::new(
924            1,
925            SyncPipeline::Frontier,
926            SyncTaskType::Performance {
927                date: NaiveDate::from_ymd_opt(2024, 12, 22).unwrap(),
928            },
929        );
930
931        let id_health = db.push_task(&task_health).unwrap();
932        let id_perf = db.push_task(&task_perf).unwrap();
933
934        let popped_perf = db
935            .pop_task_by_type(1, "performance", None)
936            .unwrap()
937            .unwrap();
938        assert_eq!(popped_perf.id, Some(id_perf));
939
940        let popped_health = db
941            .pop_task_by_type(1, "daily_health", None)
942            .unwrap()
943            .unwrap();
944        assert_eq!(popped_health.id, Some(id_health));
945    }
946
947    #[test]
948    fn test_pop_task_by_pipeline() {
949        let db = SyncDb::open_in_memory().unwrap();
950
951        let task_frontier = SyncTask::new(
952            1,
953            SyncPipeline::Frontier,
954            SyncTaskType::DailyHealth {
955                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
956            },
957        );
958        let task_backfill = SyncTask::new(
959            1,
960            SyncPipeline::Backfill,
961            SyncTaskType::DailyHealth {
962                date: NaiveDate::from_ymd_opt(2024, 12, 16).unwrap(),
963            },
964        );
965
966        let id_frontier = db.push_task(&task_frontier).unwrap();
967        let id_backfill = db.push_task(&task_backfill).unwrap();
968
969        let popped_backfill = db
970            .pop_task(1, Some(SyncPipeline::Backfill))
971            .unwrap()
972            .unwrap();
973        assert_eq!(popped_backfill.id, Some(id_backfill));
974
975        let popped_frontier = db
976            .pop_task(1, Some(SyncPipeline::Frontier))
977            .unwrap()
978            .unwrap();
979        assert_eq!(popped_frontier.id, Some(id_frontier));
980    }
981
982    #[test]
983    fn test_update_backfill_frontier() {
984        let db = SyncDb::open_in_memory().unwrap();
985
986        let frontier = NaiveDate::from_ymd_opt(2025, 1, 31).unwrap();
987        let target = NaiveDate::from_ymd_opt(2025, 1, 1).unwrap();
988        db.set_backfill_state(1, "activities", frontier, target, false)
989            .unwrap();
990
991        let new_frontier = NaiveDate::from_ymd_opt(2025, 1, 15).unwrap();
992        db.update_backfill_frontier(1, "activities", new_frontier)
993            .unwrap();
994
995        let state = db.get_backfill_state(1, "activities").unwrap().unwrap();
996        assert_eq!(state.0, new_frontier);
997        assert_eq!(state.1, target);
998        assert!(!state.2);
999    }
1000
1001    #[test]
1002    fn test_count_tasks_by_type_includes_failed() {
1003        let db = SyncDb::open_in_memory().unwrap();
1004
1005        let task = SyncTask::new(
1006            1,
1007            SyncPipeline::Frontier,
1008            SyncTaskType::DailyHealth {
1009                date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
1010            },
1011        );
1012        let id = db.push_task(&task).unwrap();
1013
1014        db.mark_task_in_progress(id).unwrap();
1015        db.mark_task_failed(id, "boom", 60).unwrap();
1016
1017        let (_activities, _gpx, health, _perf) = db.count_tasks_by_type(1, None).unwrap();
1018        assert_eq!(health, 1);
1019    }
1020}