garmin_cli/sync/
mod.rs

1//! Sync module for Garmin data synchronization
2//!
3//! Provides:
4//! - Rate-limited API access with parallel streams
5//! - Persistent task queue for crash recovery
6//! - Incremental sync with gap detection
7//! - GPX parsing for track points
8//! - Fancy TUI or simple progress output
9
10pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use crate::{Database, GarminError, Result};
22use std::io::{self, Write};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28/// Sync engine for orchestrating data synchronization
29pub struct SyncEngine {
30    db: Database,
31    client: GarminClient,
32    token: OAuth2Token,
33    rate_limiter: RateLimiter,
34    queue: TaskQueue,
35    profile_id: i32,
36    display_name: Option<String>,
37}
38
39impl SyncEngine {
40    /// Create a new sync engine
41    pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42        let queue = TaskQueue::new(db.clone());
43        Self {
44            db,
45            client,
46            token,
47            rate_limiter: RateLimiter::new(),
48            queue,
49            profile_id,
50            display_name: None,
51        }
52    }
53
54    /// Fetch and cache the user's display name
55    async fn get_display_name(&mut self) -> Result<String> {
56        if let Some(ref name) = self.display_name {
57            return Ok(name.clone());
58        }
59
60        let profile: serde_json::Value = self
61            .client
62            .get_json(&self.token, "/userprofile-service/socialProfile")
63            .await?;
64
65        let name = profile
66            .get("displayName")
67            .and_then(|v| v.as_str())
68            .map(|s| s.to_string())
69            .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71        self.display_name = Some(name.clone());
72        Ok(name)
73    }
74
75    /// Find the oldest activity date by querying the activities API
76    async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77        print!("Finding oldest activity date...");
78        let _ = io::stdout().flush();
79
80        // The API returns activities sorted by date descending (newest first)
81        // Use exponential search to find the end quickly, then fetch the last page
82
83        self.rate_limiter.wait().await;
84
85        // Step 1: Find approximate total count using exponential jumps
86        let limit: u32 = 100;
87        let mut jump: u32 = 100;
88        let mut last_non_empty: u32 = 0;
89
90        // Exponential search: 100, 200, 400, 800, 1600, 3200...
91        while jump < 10000 {
92            let path = format!(
93                "/activitylist-service/activities/search/activities?limit=1&start={}",
94                jump
95            );
96
97            let activities: Vec<serde_json::Value> =
98                self.client.get_json(&self.token, &path).await?;
99
100            if activities.is_empty() {
101                break;
102            }
103
104            last_non_empty = jump;
105            jump *= 2;
106            self.rate_limiter.wait().await;
107        }
108
109        // Step 2: Binary search to find exact end
110        let mut low = last_non_empty;
111        let mut high = jump;
112
113        while high - low > limit {
114            let mid = (low + high) / 2;
115            let path = format!(
116                "/activitylist-service/activities/search/activities?limit=1&start={}",
117                mid
118            );
119
120            self.rate_limiter.wait().await;
121            let activities: Vec<serde_json::Value> =
122                self.client.get_json(&self.token, &path).await?;
123
124            if activities.is_empty() {
125                high = mid;
126            } else {
127                low = mid;
128            }
129        }
130
131        // Step 3: Fetch the last page to get the oldest activity
132        let path = format!(
133            "/activitylist-service/activities/search/activities?limit={}&start={}",
134            limit, low
135        );
136
137        self.rate_limiter.wait().await;
138        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
139
140        let oldest_date = activities
141            .last()
142            .and_then(|activity| activity.get("startTimeLocal"))
143            .and_then(|v| v.as_str())
144            .and_then(|date_str| date_str.split(' ').next())
145            .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
146
147        let result = oldest_date.unwrap_or_else(|| {
148            // Default to 1 year ago if no activities found
149            Utc::now().date_naive() - Duration::days(365)
150        });
151
152        println!(" {}", result);
153        Ok(result)
154    }
155
156    /// Run the sync process
157    pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
158        // For now, always use sequential mode as parallel requires more refactoring
159        // The TUI will be integrated in a future update
160        if opts.fancy_ui {
161            self.run_with_progress(&opts).await
162        } else {
163            self.run_sequential(&opts).await
164        }
165    }
166
167    /// Run sync with fancy progress tracking (TUI or simple)
168    async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
169        let progress = Arc::new(SyncProgress::new());
170
171        // Fetch display name early
172        let display_name = self.get_display_name().await?;
173        progress.set_profile(&display_name);
174
175        // Recover any crashed tasks
176        let _recovered = self.queue.recover_in_progress()?;
177
178        // Plan phase
179        if self.queue.pending_count()? == 0 {
180            self.plan_sync(opts).await?;
181        }
182
183        // Count tasks by type for progress tracking
184        self.count_tasks_for_progress(&progress)?;
185
186        // Determine date range for display
187        let from_date = opts
188            .from_date
189            .unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
190        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
191        progress.set_date_range(&from_date.to_string(), &to_date.to_string());
192
193        // Spawn TUI in background
194        let ui_progress = progress.clone();
195        let ui_handle = tokio::spawn(async move {
196            if let Err(e) = ui::run_tui(ui_progress).await {
197                eprintln!("TUI error: {}", e);
198            }
199        });
200
201        // Run sync with progress updates
202        let stats = self
203            .run_with_progress_tracking(opts, progress.clone())
204            .await?;
205
206        // Wait for TUI to finish
207        ui_handle.abort();
208
209        // Print final stats
210        println!("\nSync complete: {}", stats);
211
212        Ok(stats)
213    }
214
215    /// Run sync with progress tracking (no TUI)
216    async fn run_with_progress_tracking(
217        &mut self,
218        opts: &SyncOptions,
219        progress: SharedProgress,
220    ) -> Result<SyncStats> {
221        let mut stats = SyncStats::default();
222
223        // Execute phase: process tasks
224        while let Some(task) = self.queue.pop()? {
225            if self.rate_limiter.should_pause() {
226                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
227            }
228
229            let task_id = task.id.unwrap();
230            self.queue.mark_in_progress(task_id)?;
231
232            // Update progress for current task
233            update_progress_for_task(&task, &progress);
234
235            self.rate_limiter.wait().await;
236            progress.record_request();
237
238            match self.execute_task(&task).await {
239                Ok(()) => {
240                    self.queue.mark_completed(task_id)?;
241                    self.rate_limiter.on_success();
242                    stats.completed += 1;
243                    complete_progress_for_task(&task, &progress);
244                }
245                Err(GarminError::RateLimited) => {
246                    self.rate_limiter.on_rate_limit();
247                    let backoff = self.rate_limiter.current_backoff();
248                    self.queue.mark_failed(
249                        task_id,
250                        "Rate limited",
251                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
252                    )?;
253                    stats.rate_limited += 1;
254                }
255                Err(e) => {
256                    let backoff = Duration::seconds(60);
257                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
258                    stats.failed += 1;
259                    fail_progress_for_task(&task, &progress);
260                }
261            }
262
263            if opts.dry_run {
264                break;
265            }
266        }
267
268        // Cleanup old completed tasks
269        self.queue.cleanup(7)?;
270
271        Ok(stats)
272    }
273
274    /// Run sync sequentially (original behavior, simple output)
275    async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
276        let mut stats = SyncStats::default();
277
278        // Fetch display name early (needed for health endpoints)
279        print!("Fetching user profile...");
280        let _ = io::stdout().flush();
281        let display_name = self.get_display_name().await?;
282        println!(" {}", display_name);
283
284        // Recover any crashed tasks
285        let recovered = self.queue.recover_in_progress()?;
286        if recovered > 0 {
287            println!("  Recovered {} tasks from previous run", recovered);
288            stats.recovered = recovered;
289        }
290
291        // Plan phase: generate tasks if queue is empty
292        if self.queue.pending_count()? == 0 {
293            println!("Planning sync tasks...");
294            self.plan_sync(opts).await?;
295        }
296
297        let total_tasks = self.queue.pending_count()?;
298        println!("  {} tasks queued\n", total_tasks);
299
300        // Execute phase: process tasks
301        while let Some(task) = self.queue.pop()? {
302            if self.rate_limiter.should_pause() {
303                println!(
304                    "  Rate limited, pausing for {} seconds...",
305                    self.rate_limiter.pause_duration().as_secs()
306                );
307                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
308            }
309
310            let task_id = task.id.unwrap();
311            self.queue.mark_in_progress(task_id)?;
312
313            // Print task description
314            print_task_status(&task, &stats);
315
316            self.rate_limiter.wait().await;
317
318            match self.execute_task(&task).await {
319                Ok(()) => {
320                    self.queue.mark_completed(task_id)?;
321                    self.rate_limiter.on_success();
322                    stats.completed += 1;
323                    println!(" done");
324                }
325                Err(GarminError::RateLimited) => {
326                    self.rate_limiter.on_rate_limit();
327                    let backoff = self.rate_limiter.current_backoff();
328                    self.queue.mark_failed(
329                        task_id,
330                        "Rate limited",
331                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
332                    )?;
333                    stats.rate_limited += 1;
334                    println!(" rate limited (retry in {}s)", backoff.as_secs());
335                }
336                Err(e) => {
337                    let backoff = Duration::seconds(60);
338                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
339                    stats.failed += 1;
340                    println!(" failed: {}", e);
341                }
342            }
343
344            if opts.dry_run {
345                break;
346            }
347        }
348
349        // Cleanup old completed tasks
350        self.queue.cleanup(7)?;
351
352        Ok(stats)
353    }
354
355    /// Count pending tasks by type and update progress
356    fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
357        let conn = self.db.connection();
358        let conn = conn.lock().unwrap();
359
360        // Count activities tasks
361        let act_count: i64 = conn
362            .query_row(
363                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
364                [],
365                |row| row.get(0),
366            )
367            .unwrap_or(0);
368        progress.activities.set_total(act_count as u32);
369
370        // Count GPX tasks
371        let gpx_count: i64 = conn
372            .query_row(
373                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
374                [],
375                |row| row.get(0),
376            )
377            .unwrap_or(0);
378        progress.gpx.set_total(gpx_count as u32);
379
380        // Count health tasks
381        let health_count: i64 = conn
382            .query_row(
383                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
384                [],
385                |row| row.get(0),
386            )
387            .unwrap_or(0);
388        progress.health.set_total(health_count as u32);
389
390        // Count performance tasks
391        let perf_count: i64 = conn
392            .query_row(
393                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
394                [],
395                |row| row.get(0),
396            )
397            .unwrap_or(0);
398        progress.performance.set_total(perf_count as u32);
399
400        Ok(())
401    }
402
403    /// Plan sync tasks based on current state
404    async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
405        // Determine date range - auto-detect from oldest activity if not specified
406        let from_date = match opts.from_date {
407            Some(date) => date,
408            None => self.find_oldest_activity_date().await?,
409        };
410        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
411
412        let total_days = (to_date - from_date).num_days();
413        println!(
414            "  Date range: {} to {} ({} days)",
415            from_date, to_date, total_days
416        );
417
418        // Plan activity sync
419        if opts.sync_activities {
420            println!("  Planning activity sync...");
421            self.plan_activities_sync()?;
422        }
423
424        // Plan health sync
425        if opts.sync_health {
426            let health_tasks = self.plan_health_sync(from_date, to_date)?;
427            println!("  Planning health sync: {} days to fetch", health_tasks);
428        }
429
430        // Plan performance sync
431        if opts.sync_performance {
432            let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
433            println!("  Planning performance sync: {} weeks to fetch", perf_tasks);
434        }
435
436        Ok(())
437    }
438
439    /// Plan activity sync tasks
440    fn plan_activities_sync(&self) -> Result<()> {
441        // Start with first page, we'll add more as we discover them
442        let task = SyncTask::new(
443            self.profile_id,
444            SyncTaskType::Activities {
445                start: 0,
446                limit: 50,
447            },
448        );
449        self.queue.push(task)?;
450        Ok(())
451    }
452
453    /// Plan health sync tasks for date range, returns count of tasks added
454    fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
455        let mut count = 0;
456        let mut date = from;
457        while date <= to {
458            // Check if we already have data for this date
459            if !self.has_health_data(date)? {
460                let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
461                self.queue.push(task)?;
462                count += 1;
463            }
464            date += Duration::days(1);
465        }
466        Ok(count)
467    }
468
469    /// Plan performance sync tasks, returns count of tasks added
470    fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
471        // Performance metrics don't change daily, sync weekly
472        let mut count = 0;
473        let mut date = from;
474        while date <= to {
475            if !self.has_performance_data(date)? {
476                let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
477                self.queue.push(task)?;
478                count += 1;
479            }
480            date += Duration::days(7);
481        }
482        Ok(count)
483    }
484
485    /// Check if health data exists for date
486    fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
487        let conn = self.db.connection();
488        let conn = conn.lock().unwrap();
489
490        let count: i64 = conn
491            .query_row(
492                "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
493                duckdb::params![self.profile_id, date.to_string()],
494                |row| row.get(0),
495            )
496            .map_err(|e| GarminError::Database(e.to_string()))?;
497
498        Ok(count > 0)
499    }
500
501    /// Check if performance data exists for date
502    fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
503        let conn = self.db.connection();
504        let conn = conn.lock().unwrap();
505
506        let count: i64 = conn
507            .query_row(
508                "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
509                duckdb::params![self.profile_id, date.to_string()],
510                |row| row.get(0),
511            )
512            .map_err(|e| GarminError::Database(e.to_string()))?;
513
514        Ok(count > 0)
515    }
516
517    /// Execute a single sync task
518    async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
519        match &task.task_type {
520            SyncTaskType::Activities { start, limit } => self.sync_activities(*start, *limit).await,
521            SyncTaskType::ActivityDetail { activity_id } => {
522                self.sync_activity_detail(*activity_id).await
523            }
524            SyncTaskType::DownloadGpx { activity_id, .. } => self.download_gpx(*activity_id).await,
525            SyncTaskType::DailyHealth { date } => self.sync_daily_health(*date).await,
526            SyncTaskType::Performance { date } => self.sync_performance(*date).await,
527            SyncTaskType::Weight { from, to } => self.sync_weight(*from, *to).await,
528            SyncTaskType::GenerateEmbeddings { activity_ids } => {
529                self.generate_embeddings(activity_ids).await
530            }
531        }
532    }
533
534    /// Sync activities list
535    async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
536        let path = format!(
537            "/activitylist-service/activities/search/activities?limit={}&start={}",
538            limit, start
539        );
540        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
541
542        for activity in &activities {
543            // Store activity summary
544            self.store_activity(activity)?;
545
546            // Queue GPX download for activities with GPS
547            if activity
548                .get("hasPolyline")
549                .and_then(|v| v.as_bool())
550                .unwrap_or(false)
551            {
552                if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
553                    let activity_name = activity
554                        .get("activityName")
555                        .and_then(|v| v.as_str())
556                        .map(|s| s.to_string());
557                    let activity_date = activity
558                        .get("startTimeLocal")
559                        .and_then(|v| v.as_str())
560                        .and_then(|s| s.split(' ').next())
561                        .map(|s| s.to_string());
562
563                    let task = SyncTask::new(
564                        self.profile_id,
565                        SyncTaskType::DownloadGpx {
566                            activity_id: id,
567                            activity_name,
568                            activity_date,
569                        },
570                    );
571                    self.queue.push(task)?;
572                }
573            }
574        }
575
576        // If we got a full page, there might be more
577        if activities.len() == limit as usize {
578            let task = SyncTask::new(
579                self.profile_id,
580                SyncTaskType::Activities {
581                    start: start + limit,
582                    limit,
583                },
584            );
585            self.queue.push(task)?;
586        }
587
588        Ok(())
589    }
590
591    /// Store an activity in the database
592    fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
593        let conn = self.db.connection();
594        let conn = conn.lock().unwrap();
595
596        let activity_id = activity
597            .get("activityId")
598            .and_then(|v| v.as_i64())
599            .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
600
601        conn.execute(
602            "INSERT INTO activities (
603                activity_id, profile_id, activity_name, activity_type,
604                start_time_local, duration_sec, distance_m, calories,
605                avg_hr, max_hr, avg_speed, max_speed,
606                elevation_gain, elevation_loss, raw_json
607            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
608            ON CONFLICT (activity_id) DO UPDATE SET
609                activity_name = EXCLUDED.activity_name,
610                activity_type = EXCLUDED.activity_type,
611                raw_json = EXCLUDED.raw_json",
612            duckdb::params![
613                activity_id,
614                self.profile_id,
615                activity.get("activityName").and_then(|v| v.as_str()),
616                activity
617                    .get("activityType")
618                    .and_then(|v| v.get("typeKey"))
619                    .and_then(|v| v.as_str()),
620                activity.get("startTimeLocal").and_then(|v| v.as_str()),
621                activity.get("duration").and_then(|v| v.as_f64()),
622                activity.get("distance").and_then(|v| v.as_f64()),
623                activity
624                    .get("calories")
625                    .and_then(|v| v.as_i64())
626                    .map(|v| v as i32),
627                activity
628                    .get("averageHR")
629                    .and_then(|v| v.as_i64())
630                    .map(|v| v as i32),
631                activity
632                    .get("maxHR")
633                    .and_then(|v| v.as_i64())
634                    .map(|v| v as i32),
635                activity.get("averageSpeed").and_then(|v| v.as_f64()),
636                activity.get("maxSpeed").and_then(|v| v.as_f64()),
637                activity.get("elevationGain").and_then(|v| v.as_f64()),
638                activity.get("elevationLoss").and_then(|v| v.as_f64()),
639                activity.to_string(),
640            ],
641        )
642        .map_err(|e| GarminError::Database(e.to_string()))?;
643
644        Ok(())
645    }
646
647    /// Sync activity detail (not implemented yet)
648    async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
649        // TODO: Fetch detailed activity data
650        Ok(())
651    }
652
653    /// Download and parse GPX
654    async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
655        let path = format!("/download-service/export/gpx/activity/{}", activity_id);
656        let gpx_bytes = self.client.download(&self.token, &path).await?;
657        let gpx_data = String::from_utf8_lossy(&gpx_bytes);
658        self.parse_and_store_gpx(activity_id, &gpx_data)?;
659        Ok(())
660    }
661
662    /// Parse GPX and store track points
663    fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
664        use gpx::read;
665        use std::io::BufReader;
666
667        let reader = BufReader::new(gpx_data.as_bytes());
668        let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
669
670        let conn = self.db.connection();
671        let conn = conn.lock().unwrap();
672
673        for track in gpx.tracks {
674            for segment in track.segments {
675                for point in segment.points {
676                    let timestamp = point
677                        .time
678                        .map(|t| t.format().unwrap_or_default())
679                        .unwrap_or_default();
680
681                    conn.execute(
682                        "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
683                         VALUES (?, ?, ?, ?, ?)",
684                        duckdb::params![
685                            activity_id,
686                            timestamp,
687                            point.point().y(),
688                            point.point().x(),
689                            point.elevation,
690                        ],
691                    )
692                    .map_err(|e| GarminError::Database(e.to_string()))?;
693                }
694            }
695        }
696
697        Ok(())
698    }
699
700    /// Sync daily health data
701    async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
702        // Get user's display name for the endpoint
703        let display_name = self.get_display_name().await?;
704
705        let path = format!(
706            "/usersummary-service/usersummary/daily/{}?calendarDate={}",
707            display_name, date
708        );
709
710        // Try to fetch health data - may return 404/error for dates without data
711        let health_result: std::result::Result<serde_json::Value, _> =
712            self.client.get_json(&self.token, &path).await;
713
714        let health = match health_result {
715            Ok(data) => data,
716            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
717                // No data for this date - store empty record to mark as synced
718                // This prevents re-fetching dates that never had data (before device ownership, gaps, etc.)
719                serde_json::json!({})
720            }
721            Err(e) => return Err(e),
722        };
723
724        let conn = self.db.connection();
725        let conn = conn.lock().unwrap();
726
727        conn.execute(
728            "INSERT INTO daily_health (
729                profile_id, date, steps, step_goal, total_calories, active_calories,
730                resting_hr, sleep_seconds, avg_stress, max_stress,
731                body_battery_start, body_battery_end, raw_json
732            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
733            ON CONFLICT (profile_id, date) DO UPDATE SET
734                steps = EXCLUDED.steps,
735                step_goal = EXCLUDED.step_goal,
736                total_calories = EXCLUDED.total_calories,
737                active_calories = EXCLUDED.active_calories,
738                resting_hr = EXCLUDED.resting_hr,
739                sleep_seconds = EXCLUDED.sleep_seconds,
740                avg_stress = EXCLUDED.avg_stress,
741                max_stress = EXCLUDED.max_stress,
742                body_battery_start = EXCLUDED.body_battery_start,
743                body_battery_end = EXCLUDED.body_battery_end,
744                raw_json = EXCLUDED.raw_json",
745            duckdb::params![
746                self.profile_id,
747                date.to_string(),
748                health
749                    .get("totalSteps")
750                    .and_then(|v| v.as_i64())
751                    .map(|v| v as i32),
752                health
753                    .get("dailyStepGoal")
754                    .and_then(|v| v.as_i64())
755                    .map(|v| v as i32),
756                health
757                    .get("totalKilocalories")
758                    .and_then(|v| v.as_i64())
759                    .map(|v| v as i32),
760                health
761                    .get("activeKilocalories")
762                    .and_then(|v| v.as_i64())
763                    .map(|v| v as i32),
764                health
765                    .get("restingHeartRate")
766                    .and_then(|v| v.as_i64())
767                    .map(|v| v as i32),
768                health
769                    .get("sleepingSeconds")
770                    .and_then(|v| v.as_i64())
771                    .map(|v| v as i32),
772                health
773                    .get("averageStressLevel")
774                    .and_then(|v| v.as_i64())
775                    .map(|v| v as i32),
776                health
777                    .get("maxStressLevel")
778                    .and_then(|v| v.as_i64())
779                    .map(|v| v as i32),
780                health
781                    .get("bodyBatteryChargedValue")
782                    .and_then(|v| v.as_i64())
783                    .map(|v| v as i32),
784                health
785                    .get("bodyBatteryDrainedValue")
786                    .and_then(|v| v.as_i64())
787                    .map(|v| v as i32),
788                health.to_string(),
789            ],
790        )
791        .map_err(|e| GarminError::Database(e.to_string()))?;
792
793        Ok(())
794    }
795
796    /// Sync performance metrics
797    async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
798        // Fetch VO2 max
799        let vo2max: Option<serde_json::Value> = self
800            .client
801            .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
802            .await
803            .ok();
804
805        // Fetch race predictions
806        let race_predictions: Option<serde_json::Value> = self
807            .client
808            .get_json(
809                &self.token,
810                "/metrics-service/metrics/racepredictions/latest",
811            )
812            .await
813            .ok();
814
815        let conn = self.db.connection();
816        let conn = conn.lock().unwrap();
817
818        let vo2max_value = vo2max
819            .as_ref()
820            .and_then(|v| v.get("generic"))
821            .and_then(|v| v.get("vo2MaxValue"))
822            .and_then(|v| v.as_f64());
823
824        let fitness_age = vo2max
825            .as_ref()
826            .and_then(|v| v.get("generic"))
827            .and_then(|v| v.get("fitnessAge"))
828            .and_then(|v| v.as_i64())
829            .map(|v| v as i32);
830
831        let race_5k = race_predictions
832            .as_ref()
833            .and_then(|v| v.get("time5K"))
834            .and_then(|v| v.as_f64())
835            .map(|v| v as i32);
836
837        let race_10k = race_predictions
838            .as_ref()
839            .and_then(|v| v.get("time10K"))
840            .and_then(|v| v.as_f64())
841            .map(|v| v as i32);
842
843        let race_half = race_predictions
844            .as_ref()
845            .and_then(|v| v.get("timeHalfMarathon"))
846            .and_then(|v| v.as_f64())
847            .map(|v| v as i32);
848
849        let race_marathon = race_predictions
850            .as_ref()
851            .and_then(|v| v.get("timeMarathon"))
852            .and_then(|v| v.as_f64())
853            .map(|v| v as i32);
854
855        conn.execute(
856            "INSERT INTO performance_metrics (
857                profile_id, date, vo2max, fitness_age,
858                race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
859            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
860            ON CONFLICT (profile_id, date) DO UPDATE SET
861                vo2max = EXCLUDED.vo2max,
862                fitness_age = EXCLUDED.fitness_age,
863                race_5k_sec = EXCLUDED.race_5k_sec,
864                race_10k_sec = EXCLUDED.race_10k_sec,
865                race_half_sec = EXCLUDED.race_half_sec,
866                race_marathon_sec = EXCLUDED.race_marathon_sec",
867            duckdb::params![
868                self.profile_id,
869                date.to_string(),
870                vo2max_value,
871                fitness_age,
872                race_5k,
873                race_10k,
874                race_half,
875                race_marathon,
876            ],
877        )
878        .map_err(|e| GarminError::Database(e.to_string()))?;
879
880        Ok(())
881    }
882
883    /// Sync weight data
884    async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
885        // TODO: Implement weight sync
886        Ok(())
887    }
888
889    /// Generate embeddings for activities
890    async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
891        // TODO: Implement embedding generation using fastembed
892        Ok(())
893    }
894}
895
896/// Options for sync operation
897#[derive(Default)]
898pub struct SyncOptions {
899    /// Sync activities
900    pub sync_activities: bool,
901    /// Sync daily health
902    pub sync_health: bool,
903    /// Sync performance metrics
904    pub sync_performance: bool,
905    /// Start date for sync
906    pub from_date: Option<NaiveDate>,
907    /// End date for sync
908    pub to_date: Option<NaiveDate>,
909    /// Dry run (plan only, don't execute)
910    pub dry_run: bool,
911    /// Force re-sync (ignore existing data)
912    pub force: bool,
913    /// Use fancy TUI (default: true, set false for --simple)
914    pub fancy_ui: bool,
915    /// Number of concurrent API requests (default: 3)
916    pub concurrency: usize,
917}
918
919impl SyncOptions {
920    /// Create options for full sync
921    pub fn full() -> Self {
922        Self {
923            sync_activities: true,
924            sync_health: true,
925            sync_performance: true,
926            fancy_ui: true,
927            concurrency: 3,
928            ..Default::default()
929        }
930    }
931
932    /// Create options for simple (non-TUI) mode
933    pub fn simple() -> Self {
934        Self {
935            sync_activities: true,
936            sync_health: true,
937            sync_performance: true,
938            fancy_ui: false,
939            concurrency: 3,
940            ..Default::default()
941        }
942    }
943}
944
945/// Statistics from sync operation
946#[derive(Default)]
947pub struct SyncStats {
948    /// Tasks recovered from previous run
949    pub recovered: u32,
950    /// Tasks completed successfully
951    pub completed: u32,
952    /// Tasks that hit rate limits
953    pub rate_limited: u32,
954    /// Tasks that failed
955    pub failed: u32,
956}
957
958impl std::fmt::Display for SyncStats {
959    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960        write!(
961            f,
962            "Completed: {}, Failed: {}, Rate limited: {}",
963            self.completed, self.failed, self.rate_limited
964        )?;
965        if self.recovered > 0 {
966            write!(f, ", Recovered: {}", self.recovered)?;
967        }
968        Ok(())
969    }
970}
971
972/// Print task status with nice formatting
973fn print_task_status(task: &SyncTask, stats: &SyncStats) {
974    let desc = match &task.task_type {
975        SyncTaskType::Activities { start, limit } => {
976            format!("Fetching activities {}-{}", start, start + limit)
977        }
978        SyncTaskType::ActivityDetail { activity_id } => {
979            format!("Activity {} details", activity_id)
980        }
981        SyncTaskType::DownloadGpx {
982            activity_id,
983            activity_name,
984            activity_date,
985        } => {
986            let name = activity_name.as_deref().unwrap_or("Unknown");
987            let date = activity_date.as_deref().unwrap_or("");
988            if date.is_empty() {
989                format!("GPX: {} ({})", name, activity_id)
990            } else {
991                format!("GPX: {} {} ({})", date, name, activity_id)
992            }
993        }
994        SyncTaskType::DailyHealth { date } => {
995            format!("Health data for {}", date)
996        }
997        SyncTaskType::Performance { date } => {
998            format!("Performance metrics for {}", date)
999        }
1000        SyncTaskType::Weight { from, to } => {
1001            format!("Weight {} to {}", from, to)
1002        }
1003        SyncTaskType::GenerateEmbeddings { activity_ids } => {
1004            format!(
1005                "Generating embeddings for {} activities",
1006                activity_ids.len()
1007            )
1008        }
1009    };
1010
1011    print!("[{:>4}] {}...", stats.completed + 1, desc);
1012    let _ = io::stdout().flush();
1013}
1014
1015/// Update progress when starting a task
1016fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1017    let desc = match &task.task_type {
1018        SyncTaskType::Activities { start, limit } => {
1019            format!("Activities {}-{}", start, start + limit)
1020        }
1021        SyncTaskType::DownloadGpx {
1022            activity_name,
1023            activity_date,
1024            ..
1025        } => {
1026            let name = activity_name.as_deref().unwrap_or("Unknown");
1027            let date = activity_date.as_deref().unwrap_or("");
1028            if date.is_empty() {
1029                name.to_string()
1030            } else {
1031                format!("{} {}", date, name)
1032            }
1033        }
1034        SyncTaskType::DailyHealth { date } => date.to_string(),
1035        SyncTaskType::Performance { date } => date.to_string(),
1036        _ => String::new(),
1037    };
1038
1039    match &task.task_type {
1040        SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
1041        SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
1042        SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
1043        SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
1044        _ => {}
1045    }
1046}
1047
1048/// Mark a task as completed in progress
1049fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1050    match &task.task_type {
1051        SyncTaskType::Activities { .. } => {
1052            progress.activities.complete_one();
1053            // Activities can spawn GPX downloads, so update GPX total
1054            // This is handled dynamically when GPX tasks are added
1055        }
1056        SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1057        SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1058        SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1059        _ => {}
1060    }
1061}
1062
1063/// Mark a task as failed in progress
1064fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1065    match &task.task_type {
1066        SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1067        SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1068        SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1069        SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1070        _ => {}
1071    }
1072}