garmin_cli/sync/
mod.rs

1//! Sync module for Garmin data synchronization
2//!
3//! Provides:
4//! - Rate-limited API access with parallel streams
5//! - Persistent task queue for crash recovery
6//! - Incremental sync with gap detection
7//! - GPX parsing for track points
8//! - Fancy TUI or simple progress output
9
10pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use std::io::{self, Write};
22use crate::{Database, GarminError, Result};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28/// Sync engine for orchestrating data synchronization
29pub struct SyncEngine {
30    db: Database,
31    client: GarminClient,
32    token: OAuth2Token,
33    rate_limiter: RateLimiter,
34    queue: TaskQueue,
35    profile_id: i32,
36    display_name: Option<String>,
37}
38
39impl SyncEngine {
40    /// Create a new sync engine
41    pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42        let queue = TaskQueue::new(db.clone());
43        Self {
44            db,
45            client,
46            token,
47            rate_limiter: RateLimiter::new(),
48            queue,
49            profile_id,
50            display_name: None,
51        }
52    }
53
54    /// Fetch and cache the user's display name
55    async fn get_display_name(&mut self) -> Result<String> {
56        if let Some(ref name) = self.display_name {
57            return Ok(name.clone());
58        }
59
60        let profile: serde_json::Value = self
61            .client
62            .get_json(&self.token, "/userprofile-service/socialProfile")
63            .await?;
64
65        let name = profile
66            .get("displayName")
67            .and_then(|v| v.as_str())
68            .map(|s| s.to_string())
69            .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71        self.display_name = Some(name.clone());
72        Ok(name)
73    }
74
75    /// Find the oldest activity date by querying the activities API
76    async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77        print!("Finding oldest activity date...");
78        let _ = io::stdout().flush();
79
80        // The API returns activities sorted by date descending (newest first)
81        // Use exponential search to find the end quickly, then fetch the last page
82
83        self.rate_limiter.wait().await;
84
85        // Step 1: Find approximate total count using exponential jumps
86        let limit: u32 = 100;
87        let mut jump: u32 = 100;
88        let mut last_non_empty: u32 = 0;
89
90        // Exponential search: 100, 200, 400, 800, 1600, 3200...
91        while jump < 10000 {
92            let path = format!(
93                "/activitylist-service/activities/search/activities?limit=1&start={}",
94                jump
95            );
96
97            let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
98
99            if activities.is_empty() {
100                break;
101            }
102
103            last_non_empty = jump;
104            jump *= 2;
105            self.rate_limiter.wait().await;
106        }
107
108        // Step 2: Binary search to find exact end
109        let mut low = last_non_empty;
110        let mut high = jump;
111
112        while high - low > limit {
113            let mid = (low + high) / 2;
114            let path = format!(
115                "/activitylist-service/activities/search/activities?limit=1&start={}",
116                mid
117            );
118
119            self.rate_limiter.wait().await;
120            let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
121
122            if activities.is_empty() {
123                high = mid;
124            } else {
125                low = mid;
126            }
127        }
128
129        // Step 3: Fetch the last page to get the oldest activity
130        let path = format!(
131            "/activitylist-service/activities/search/activities?limit={}&start={}",
132            limit, low
133        );
134
135        self.rate_limiter.wait().await;
136        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
137
138        let oldest_date = activities
139            .last()
140            .and_then(|activity| activity.get("startTimeLocal"))
141            .and_then(|v| v.as_str())
142            .and_then(|date_str| date_str.split(' ').next())
143            .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
144
145        let result = oldest_date.unwrap_or_else(|| {
146            // Default to 1 year ago if no activities found
147            Utc::now().date_naive() - Duration::days(365)
148        });
149
150        println!(" {}", result);
151        Ok(result)
152    }
153
154    /// Run the sync process
155    pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
156        // For now, always use sequential mode as parallel requires more refactoring
157        // The TUI will be integrated in a future update
158        if opts.fancy_ui {
159            self.run_with_progress(&opts).await
160        } else {
161            self.run_sequential(&opts).await
162        }
163    }
164
165    /// Run sync with fancy progress tracking (TUI or simple)
166    async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
167        let progress = Arc::new(SyncProgress::new());
168
169        // Fetch display name early
170        let display_name = self.get_display_name().await?;
171        progress.set_profile(&display_name);
172
173        // Recover any crashed tasks
174        let _recovered = self.queue.recover_in_progress()?;
175
176        // Plan phase
177        if self.queue.pending_count()? == 0 {
178            self.plan_sync(opts).await?;
179        }
180
181        // Count tasks by type for progress tracking
182        self.count_tasks_for_progress(&progress)?;
183
184        // Determine date range for display
185        let from_date = opts.from_date.unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
186        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
187        progress.set_date_range(&from_date.to_string(), &to_date.to_string());
188
189        // Spawn TUI in background
190        let ui_progress = progress.clone();
191        let ui_handle = tokio::spawn(async move {
192            if let Err(e) = ui::run_tui(ui_progress).await {
193                eprintln!("TUI error: {}", e);
194            }
195        });
196
197        // Run sync with progress updates
198        let stats = self.run_with_progress_tracking(opts, progress.clone()).await?;
199
200        // Wait for TUI to finish
201        ui_handle.abort();
202
203        // Print final stats
204        println!("\nSync complete: {}", stats);
205
206        Ok(stats)
207    }
208
209    /// Run sync with progress tracking (no TUI)
210    async fn run_with_progress_tracking(
211        &mut self,
212        opts: &SyncOptions,
213        progress: SharedProgress,
214    ) -> Result<SyncStats> {
215        let mut stats = SyncStats::default();
216
217        // Execute phase: process tasks
218        while let Some(task) = self.queue.pop()? {
219            if self.rate_limiter.should_pause() {
220                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
221            }
222
223            let task_id = task.id.unwrap();
224            self.queue.mark_in_progress(task_id)?;
225
226            // Update progress for current task
227            update_progress_for_task(&task, &progress);
228
229            self.rate_limiter.wait().await;
230            progress.record_request();
231
232            match self.execute_task(&task).await {
233                Ok(()) => {
234                    self.queue.mark_completed(task_id)?;
235                    self.rate_limiter.on_success();
236                    stats.completed += 1;
237                    complete_progress_for_task(&task, &progress);
238                }
239                Err(GarminError::RateLimited) => {
240                    self.rate_limiter.on_rate_limit();
241                    let backoff = self.rate_limiter.current_backoff();
242                    self.queue.mark_failed(
243                        task_id,
244                        "Rate limited",
245                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
246                    )?;
247                    stats.rate_limited += 1;
248                }
249                Err(e) => {
250                    let backoff = Duration::seconds(60);
251                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
252                    stats.failed += 1;
253                    fail_progress_for_task(&task, &progress);
254                }
255            }
256
257            if opts.dry_run {
258                break;
259            }
260        }
261
262        // Cleanup old completed tasks
263        self.queue.cleanup(7)?;
264
265        Ok(stats)
266    }
267
268    /// Run sync sequentially (original behavior, simple output)
269    async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
270        let mut stats = SyncStats::default();
271
272        // Fetch display name early (needed for health endpoints)
273        print!("Fetching user profile...");
274        let _ = io::stdout().flush();
275        let display_name = self.get_display_name().await?;
276        println!(" {}", display_name);
277
278        // Recover any crashed tasks
279        let recovered = self.queue.recover_in_progress()?;
280        if recovered > 0 {
281            println!("  Recovered {} tasks from previous run", recovered);
282            stats.recovered = recovered;
283        }
284
285        // Plan phase: generate tasks if queue is empty
286        if self.queue.pending_count()? == 0 {
287            println!("Planning sync tasks...");
288            self.plan_sync(opts).await?;
289        }
290
291        let total_tasks = self.queue.pending_count()?;
292        println!("  {} tasks queued\n", total_tasks);
293
294        // Execute phase: process tasks
295        while let Some(task) = self.queue.pop()? {
296            if self.rate_limiter.should_pause() {
297                println!(
298                    "  Rate limited, pausing for {} seconds...",
299                    self.rate_limiter.pause_duration().as_secs()
300                );
301                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
302            }
303
304            let task_id = task.id.unwrap();
305            self.queue.mark_in_progress(task_id)?;
306
307            // Print task description
308            print_task_status(&task, &stats);
309
310            self.rate_limiter.wait().await;
311
312            match self.execute_task(&task).await {
313                Ok(()) => {
314                    self.queue.mark_completed(task_id)?;
315                    self.rate_limiter.on_success();
316                    stats.completed += 1;
317                    println!(" done");
318                }
319                Err(GarminError::RateLimited) => {
320                    self.rate_limiter.on_rate_limit();
321                    let backoff = self.rate_limiter.current_backoff();
322                    self.queue.mark_failed(
323                        task_id,
324                        "Rate limited",
325                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
326                    )?;
327                    stats.rate_limited += 1;
328                    println!(" rate limited (retry in {}s)", backoff.as_secs());
329                }
330                Err(e) => {
331                    let backoff = Duration::seconds(60);
332                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
333                    stats.failed += 1;
334                    println!(" failed: {}", e);
335                }
336            }
337
338            if opts.dry_run {
339                break;
340            }
341        }
342
343        // Cleanup old completed tasks
344        self.queue.cleanup(7)?;
345
346        Ok(stats)
347    }
348
349    /// Count pending tasks by type and update progress
350    fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
351        let conn = self.db.connection();
352        let conn = conn.lock().unwrap();
353
354        // Count activities tasks
355        let act_count: i64 = conn
356            .query_row(
357                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
358                [],
359                |row| row.get(0),
360            )
361            .unwrap_or(0);
362        progress.activities.set_total(act_count as u32);
363
364        // Count GPX tasks
365        let gpx_count: i64 = conn
366            .query_row(
367                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
368                [],
369                |row| row.get(0),
370            )
371            .unwrap_or(0);
372        progress.gpx.set_total(gpx_count as u32);
373
374        // Count health tasks
375        let health_count: i64 = conn
376            .query_row(
377                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
378                [],
379                |row| row.get(0),
380            )
381            .unwrap_or(0);
382        progress.health.set_total(health_count as u32);
383
384        // Count performance tasks
385        let perf_count: i64 = conn
386            .query_row(
387                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
388                [],
389                |row| row.get(0),
390            )
391            .unwrap_or(0);
392        progress.performance.set_total(perf_count as u32);
393
394        Ok(())
395    }
396
397    /// Plan sync tasks based on current state
398    async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
399        // Determine date range - auto-detect from oldest activity if not specified
400        let from_date = match opts.from_date {
401            Some(date) => date,
402            None => self.find_oldest_activity_date().await?,
403        };
404        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
405
406        let total_days = (to_date - from_date).num_days();
407        println!("  Date range: {} to {} ({} days)", from_date, to_date, total_days);
408
409        // Plan activity sync
410        if opts.sync_activities {
411            println!("  Planning activity sync...");
412            self.plan_activities_sync()?;
413        }
414
415        // Plan health sync
416        if opts.sync_health {
417            let health_tasks = self.plan_health_sync(from_date, to_date)?;
418            println!("  Planning health sync: {} days to fetch", health_tasks);
419        }
420
421        // Plan performance sync
422        if opts.sync_performance {
423            let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
424            println!("  Planning performance sync: {} weeks to fetch", perf_tasks);
425        }
426
427        Ok(())
428    }
429
430    /// Plan activity sync tasks
431    fn plan_activities_sync(&self) -> Result<()> {
432        // Start with first page, we'll add more as we discover them
433        let task = SyncTask::new(
434            self.profile_id,
435            SyncTaskType::Activities { start: 0, limit: 50 },
436        );
437        self.queue.push(task)?;
438        Ok(())
439    }
440
441    /// Plan health sync tasks for date range, returns count of tasks added
442    fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
443        let mut count = 0;
444        let mut date = from;
445        while date <= to {
446            // Check if we already have data for this date
447            if !self.has_health_data(date)? {
448                let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
449                self.queue.push(task)?;
450                count += 1;
451            }
452            date += Duration::days(1);
453        }
454        Ok(count)
455    }
456
457    /// Plan performance sync tasks, returns count of tasks added
458    fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
459        // Performance metrics don't change daily, sync weekly
460        let mut count = 0;
461        let mut date = from;
462        while date <= to {
463            if !self.has_performance_data(date)? {
464                let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
465                self.queue.push(task)?;
466                count += 1;
467            }
468            date += Duration::days(7);
469        }
470        Ok(count)
471    }
472
473    /// Check if health data exists for date
474    fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
475        let conn = self.db.connection();
476        let conn = conn.lock().unwrap();
477
478        let count: i64 = conn
479            .query_row(
480                "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
481                duckdb::params![self.profile_id, date.to_string()],
482                |row| row.get(0),
483            )
484            .map_err(|e| GarminError::Database(e.to_string()))?;
485
486        Ok(count > 0)
487    }
488
489    /// Check if performance data exists for date
490    fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
491        let conn = self.db.connection();
492        let conn = conn.lock().unwrap();
493
494        let count: i64 = conn
495            .query_row(
496                "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
497                duckdb::params![self.profile_id, date.to_string()],
498                |row| row.get(0),
499            )
500            .map_err(|e| GarminError::Database(e.to_string()))?;
501
502        Ok(count > 0)
503    }
504
505    /// Execute a single sync task
506    async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
507        match &task.task_type {
508            SyncTaskType::Activities { start, limit } => {
509                self.sync_activities(*start, *limit).await
510            }
511            SyncTaskType::ActivityDetail { activity_id } => {
512                self.sync_activity_detail(*activity_id).await
513            }
514            SyncTaskType::DownloadGpx { activity_id, .. } => {
515                self.download_gpx(*activity_id).await
516            }
517            SyncTaskType::DailyHealth { date } => {
518                self.sync_daily_health(*date).await
519            }
520            SyncTaskType::Performance { date } => {
521                self.sync_performance(*date).await
522            }
523            SyncTaskType::Weight { from, to } => {
524                self.sync_weight(*from, *to).await
525            }
526            SyncTaskType::GenerateEmbeddings { activity_ids } => {
527                self.generate_embeddings(activity_ids).await
528            }
529        }
530    }
531
532    /// Sync activities list
533    async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
534        let path = format!(
535            "/activitylist-service/activities/search/activities?limit={}&start={}",
536            limit, start
537        );
538        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
539
540        for activity in &activities {
541            // Store activity summary
542            self.store_activity(activity)?;
543
544            // Queue GPX download for activities with GPS
545            if activity.get("hasPolyline").and_then(|v| v.as_bool()).unwrap_or(false) {
546                if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
547                    let activity_name = activity
548                        .get("activityName")
549                        .and_then(|v| v.as_str())
550                        .map(|s| s.to_string());
551                    let activity_date = activity
552                        .get("startTimeLocal")
553                        .and_then(|v| v.as_str())
554                        .and_then(|s| s.split(' ').next())
555                        .map(|s| s.to_string());
556
557                    let task = SyncTask::new(
558                        self.profile_id,
559                        SyncTaskType::DownloadGpx {
560                            activity_id: id,
561                            activity_name,
562                            activity_date,
563                        },
564                    );
565                    self.queue.push(task)?;
566                }
567            }
568        }
569
570        // If we got a full page, there might be more
571        if activities.len() == limit as usize {
572            let task = SyncTask::new(
573                self.profile_id,
574                SyncTaskType::Activities {
575                    start: start + limit,
576                    limit,
577                },
578            );
579            self.queue.push(task)?;
580        }
581
582        Ok(())
583    }
584
585    /// Store an activity in the database
586    fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
587        let conn = self.db.connection();
588        let conn = conn.lock().unwrap();
589
590        let activity_id = activity
591            .get("activityId")
592            .and_then(|v| v.as_i64())
593            .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
594
595        conn.execute(
596            "INSERT INTO activities (
597                activity_id, profile_id, activity_name, activity_type,
598                start_time_local, duration_sec, distance_m, calories,
599                avg_hr, max_hr, avg_speed, max_speed,
600                elevation_gain, elevation_loss, raw_json
601            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
602            ON CONFLICT (activity_id) DO UPDATE SET
603                activity_name = EXCLUDED.activity_name,
604                activity_type = EXCLUDED.activity_type,
605                raw_json = EXCLUDED.raw_json",
606            duckdb::params![
607                activity_id,
608                self.profile_id,
609                activity.get("activityName").and_then(|v| v.as_str()),
610                activity.get("activityType").and_then(|v| v.get("typeKey")).and_then(|v| v.as_str()),
611                activity.get("startTimeLocal").and_then(|v| v.as_str()),
612                activity.get("duration").and_then(|v| v.as_f64()),
613                activity.get("distance").and_then(|v| v.as_f64()),
614                activity.get("calories").and_then(|v| v.as_i64()).map(|v| v as i32),
615                activity.get("averageHR").and_then(|v| v.as_i64()).map(|v| v as i32),
616                activity.get("maxHR").and_then(|v| v.as_i64()).map(|v| v as i32),
617                activity.get("averageSpeed").and_then(|v| v.as_f64()),
618                activity.get("maxSpeed").and_then(|v| v.as_f64()),
619                activity.get("elevationGain").and_then(|v| v.as_f64()),
620                activity.get("elevationLoss").and_then(|v| v.as_f64()),
621                activity.to_string(),
622            ],
623        )
624        .map_err(|e| GarminError::Database(e.to_string()))?;
625
626        Ok(())
627    }
628
629    /// Sync activity detail (not implemented yet)
630    async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
631        // TODO: Fetch detailed activity data
632        Ok(())
633    }
634
635    /// Download and parse GPX
636    async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
637        let path = format!("/download-service/export/gpx/activity/{}", activity_id);
638        let gpx_bytes = self.client.download(&self.token, &path).await?;
639        let gpx_data = String::from_utf8_lossy(&gpx_bytes);
640        self.parse_and_store_gpx(activity_id, &gpx_data)?;
641        Ok(())
642    }
643
644    /// Parse GPX and store track points
645    fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
646        use gpx::read;
647        use std::io::BufReader;
648
649        let reader = BufReader::new(gpx_data.as_bytes());
650        let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
651
652        let conn = self.db.connection();
653        let conn = conn.lock().unwrap();
654
655        for track in gpx.tracks {
656            for segment in track.segments {
657                for point in segment.points {
658                    let timestamp = point
659                        .time
660                        .map(|t| t.format().unwrap_or_default())
661                        .unwrap_or_default();
662
663                    conn.execute(
664                        "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
665                         VALUES (?, ?, ?, ?, ?)",
666                        duckdb::params![
667                            activity_id,
668                            timestamp,
669                            point.point().y(),
670                            point.point().x(),
671                            point.elevation,
672                        ],
673                    )
674                    .map_err(|e| GarminError::Database(e.to_string()))?;
675                }
676            }
677        }
678
679        Ok(())
680    }
681
682    /// Sync daily health data
683    async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
684        // Get user's display name for the endpoint
685        let display_name = self.get_display_name().await?;
686
687        let path = format!(
688            "/usersummary-service/usersummary/daily/{}?calendarDate={}",
689            display_name, date
690        );
691
692        // Try to fetch health data - may return 404/error for dates without data
693        let health_result: std::result::Result<serde_json::Value, _> =
694            self.client.get_json(&self.token, &path).await;
695
696        let health = match health_result {
697            Ok(data) => data,
698            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
699                // No data for this date - store empty record to mark as synced
700                // This prevents re-fetching dates that never had data (before device ownership, gaps, etc.)
701                serde_json::json!({})
702            }
703            Err(e) => return Err(e),
704        };
705
706        let conn = self.db.connection();
707        let conn = conn.lock().unwrap();
708
709        conn.execute(
710            "INSERT INTO daily_health (
711                profile_id, date, steps, step_goal, total_calories, active_calories,
712                resting_hr, sleep_seconds, avg_stress, max_stress,
713                body_battery_start, body_battery_end, raw_json
714            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
715            ON CONFLICT (profile_id, date) DO UPDATE SET
716                steps = EXCLUDED.steps,
717                step_goal = EXCLUDED.step_goal,
718                total_calories = EXCLUDED.total_calories,
719                active_calories = EXCLUDED.active_calories,
720                resting_hr = EXCLUDED.resting_hr,
721                sleep_seconds = EXCLUDED.sleep_seconds,
722                avg_stress = EXCLUDED.avg_stress,
723                max_stress = EXCLUDED.max_stress,
724                body_battery_start = EXCLUDED.body_battery_start,
725                body_battery_end = EXCLUDED.body_battery_end,
726                raw_json = EXCLUDED.raw_json",
727            duckdb::params![
728                self.profile_id,
729                date.to_string(),
730                health.get("totalSteps").and_then(|v| v.as_i64()).map(|v| v as i32),
731                health.get("dailyStepGoal").and_then(|v| v.as_i64()).map(|v| v as i32),
732                health.get("totalKilocalories").and_then(|v| v.as_i64()).map(|v| v as i32),
733                health.get("activeKilocalories").and_then(|v| v.as_i64()).map(|v| v as i32),
734                health.get("restingHeartRate").and_then(|v| v.as_i64()).map(|v| v as i32),
735                health.get("sleepingSeconds").and_then(|v| v.as_i64()).map(|v| v as i32),
736                health.get("averageStressLevel").and_then(|v| v.as_i64()).map(|v| v as i32),
737                health.get("maxStressLevel").and_then(|v| v.as_i64()).map(|v| v as i32),
738                health.get("bodyBatteryChargedValue").and_then(|v| v.as_i64()).map(|v| v as i32),
739                health.get("bodyBatteryDrainedValue").and_then(|v| v.as_i64()).map(|v| v as i32),
740                health.to_string(),
741            ],
742        )
743        .map_err(|e| GarminError::Database(e.to_string()))?;
744
745        Ok(())
746    }
747
748    /// Sync performance metrics
749    async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
750        // Fetch VO2 max
751        let vo2max: Option<serde_json::Value> = self
752            .client
753            .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
754            .await
755            .ok();
756
757        // Fetch race predictions
758        let race_predictions: Option<serde_json::Value> = self
759            .client
760            .get_json(&self.token, "/metrics-service/metrics/racepredictions/latest")
761            .await
762            .ok();
763
764        let conn = self.db.connection();
765        let conn = conn.lock().unwrap();
766
767        let vo2max_value = vo2max
768            .as_ref()
769            .and_then(|v| v.get("generic"))
770            .and_then(|v| v.get("vo2MaxValue"))
771            .and_then(|v| v.as_f64());
772
773        let fitness_age = vo2max
774            .as_ref()
775            .and_then(|v| v.get("generic"))
776            .and_then(|v| v.get("fitnessAge"))
777            .and_then(|v| v.as_i64())
778            .map(|v| v as i32);
779
780        let race_5k = race_predictions
781            .as_ref()
782            .and_then(|v| v.get("time5K"))
783            .and_then(|v| v.as_f64())
784            .map(|v| v as i32);
785
786        let race_10k = race_predictions
787            .as_ref()
788            .and_then(|v| v.get("time10K"))
789            .and_then(|v| v.as_f64())
790            .map(|v| v as i32);
791
792        let race_half = race_predictions
793            .as_ref()
794            .and_then(|v| v.get("timeHalfMarathon"))
795            .and_then(|v| v.as_f64())
796            .map(|v| v as i32);
797
798        let race_marathon = race_predictions
799            .as_ref()
800            .and_then(|v| v.get("timeMarathon"))
801            .and_then(|v| v.as_f64())
802            .map(|v| v as i32);
803
804        conn.execute(
805            "INSERT INTO performance_metrics (
806                profile_id, date, vo2max, fitness_age,
807                race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
808            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
809            ON CONFLICT (profile_id, date) DO UPDATE SET
810                vo2max = EXCLUDED.vo2max,
811                fitness_age = EXCLUDED.fitness_age,
812                race_5k_sec = EXCLUDED.race_5k_sec,
813                race_10k_sec = EXCLUDED.race_10k_sec,
814                race_half_sec = EXCLUDED.race_half_sec,
815                race_marathon_sec = EXCLUDED.race_marathon_sec",
816            duckdb::params![
817                self.profile_id,
818                date.to_string(),
819                vo2max_value,
820                fitness_age,
821                race_5k,
822                race_10k,
823                race_half,
824                race_marathon,
825            ],
826        )
827        .map_err(|e| GarminError::Database(e.to_string()))?;
828
829        Ok(())
830    }
831
832    /// Sync weight data
833    async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
834        // TODO: Implement weight sync
835        Ok(())
836    }
837
838    /// Generate embeddings for activities
839    async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
840        // TODO: Implement embedding generation using fastembed
841        Ok(())
842    }
843}
844
845/// Options for sync operation
846#[derive(Default)]
847pub struct SyncOptions {
848    /// Sync activities
849    pub sync_activities: bool,
850    /// Sync daily health
851    pub sync_health: bool,
852    /// Sync performance metrics
853    pub sync_performance: bool,
854    /// Start date for sync
855    pub from_date: Option<NaiveDate>,
856    /// End date for sync
857    pub to_date: Option<NaiveDate>,
858    /// Dry run (plan only, don't execute)
859    pub dry_run: bool,
860    /// Force re-sync (ignore existing data)
861    pub force: bool,
862    /// Use fancy TUI (default: true, set false for --simple)
863    pub fancy_ui: bool,
864    /// Number of concurrent API requests (default: 3)
865    pub concurrency: usize,
866}
867
868impl SyncOptions {
869    /// Create options for full sync
870    pub fn full() -> Self {
871        Self {
872            sync_activities: true,
873            sync_health: true,
874            sync_performance: true,
875            fancy_ui: true,
876            concurrency: 3,
877            ..Default::default()
878        }
879    }
880
881    /// Create options for simple (non-TUI) mode
882    pub fn simple() -> Self {
883        Self {
884            sync_activities: true,
885            sync_health: true,
886            sync_performance: true,
887            fancy_ui: false,
888            concurrency: 3,
889            ..Default::default()
890        }
891    }
892}
893
894/// Statistics from sync operation
895#[derive(Default)]
896pub struct SyncStats {
897    /// Tasks recovered from previous run
898    pub recovered: u32,
899    /// Tasks completed successfully
900    pub completed: u32,
901    /// Tasks that hit rate limits
902    pub rate_limited: u32,
903    /// Tasks that failed
904    pub failed: u32,
905}
906
907impl std::fmt::Display for SyncStats {
908    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909        write!(
910            f,
911            "Completed: {}, Failed: {}, Rate limited: {}",
912            self.completed, self.failed, self.rate_limited
913        )?;
914        if self.recovered > 0 {
915            write!(f, ", Recovered: {}", self.recovered)?;
916        }
917        Ok(())
918    }
919}
920
921/// Print task status with nice formatting
922fn print_task_status(task: &SyncTask, stats: &SyncStats) {
923    let desc = match &task.task_type {
924        SyncTaskType::Activities { start, limit } => {
925            format!("Fetching activities {}-{}", start, start + limit)
926        }
927        SyncTaskType::ActivityDetail { activity_id } => {
928            format!("Activity {} details", activity_id)
929        }
930        SyncTaskType::DownloadGpx {
931            activity_id,
932            activity_name,
933            activity_date,
934        } => {
935            let name = activity_name.as_deref().unwrap_or("Unknown");
936            let date = activity_date.as_deref().unwrap_or("");
937            if date.is_empty() {
938                format!("GPX: {} ({})", name, activity_id)
939            } else {
940                format!("GPX: {} {} ({})", date, name, activity_id)
941            }
942        }
943        SyncTaskType::DailyHealth { date } => {
944            format!("Health data for {}", date)
945        }
946        SyncTaskType::Performance { date } => {
947            format!("Performance metrics for {}", date)
948        }
949        SyncTaskType::Weight { from, to } => {
950            format!("Weight {} to {}", from, to)
951        }
952        SyncTaskType::GenerateEmbeddings { activity_ids } => {
953            format!("Generating embeddings for {} activities", activity_ids.len())
954        }
955    };
956
957    print!("[{:>4}] {}...", stats.completed + 1, desc);
958    let _ = io::stdout().flush();
959}
960
961/// Update progress when starting a task
962fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
963    let desc = match &task.task_type {
964        SyncTaskType::Activities { start, limit } => {
965            format!("Activities {}-{}", start, start + limit)
966        }
967        SyncTaskType::DownloadGpx {
968            activity_name,
969            activity_date,
970            ..
971        } => {
972            let name = activity_name.as_deref().unwrap_or("Unknown");
973            let date = activity_date.as_deref().unwrap_or("");
974            if date.is_empty() {
975                name.to_string()
976            } else {
977                format!("{} {}", date, name)
978            }
979        }
980        SyncTaskType::DailyHealth { date } => date.to_string(),
981        SyncTaskType::Performance { date } => date.to_string(),
982        _ => String::new(),
983    };
984
985    match &task.task_type {
986        SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
987        SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
988        SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
989        SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
990        _ => {}
991    }
992}
993
994/// Mark a task as completed in progress
995fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
996    match &task.task_type {
997        SyncTaskType::Activities { .. } => {
998            progress.activities.complete_one();
999            // Activities can spawn GPX downloads, so update GPX total
1000            // This is handled dynamically when GPX tasks are added
1001        }
1002        SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1003        SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1004        SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1005        _ => {}
1006    }
1007}
1008
1009/// Mark a task as failed in progress
1010fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1011    match &task.task_type {
1012        SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1013        SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1014        SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1015        SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1016        _ => {}
1017    }
1018}