garmin_cli/sync/
mod.rs

1//! Sync module for Garmin data synchronization
2//!
3//! Provides:
4//! - Rate-limited API access with parallel streams
5//! - Persistent task queue for crash recovery
6//! - Incremental sync with gap detection
7//! - GPX parsing for track points
8//! - Fancy TUI or simple progress output
9
10pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use crate::{Database, GarminError, Result};
22use std::io::{self, Write};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28/// Sync engine for orchestrating data synchronization
29pub struct SyncEngine {
30    db: Database,
31    client: GarminClient,
32    token: OAuth2Token,
33    rate_limiter: RateLimiter,
34    queue: TaskQueue,
35    profile_id: i32,
36    display_name: Option<String>,
37}
38
39impl SyncEngine {
40    /// Create a new sync engine
41    pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42        let queue = TaskQueue::new(db.clone());
43        Self {
44            db,
45            client,
46            token,
47            rate_limiter: RateLimiter::new(),
48            queue,
49            profile_id,
50            display_name: None,
51        }
52    }
53
54    /// Fetch and cache the user's display name
55    async fn get_display_name(&mut self) -> Result<String> {
56        if let Some(ref name) = self.display_name {
57            return Ok(name.clone());
58        }
59
60        let profile: serde_json::Value = self
61            .client
62            .get_json(&self.token, "/userprofile-service/socialProfile")
63            .await?;
64
65        let name = profile
66            .get("displayName")
67            .and_then(|v| v.as_str())
68            .map(|s| s.to_string())
69            .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71        self.display_name = Some(name.clone());
72        Ok(name)
73    }
74
75    /// Find the oldest activity date by querying the activities API
76    async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77        print!("Finding oldest activity date...");
78        let _ = io::stdout().flush();
79
80        // The API returns activities sorted by date descending (newest first)
81        // Use exponential search to find the end quickly, then fetch the last page
82
83        self.rate_limiter.wait().await;
84
85        // Step 1: Find approximate total count using exponential jumps
86        let limit: u32 = 100;
87        let mut jump: u32 = 100;
88        let mut last_non_empty: u32 = 0;
89
90        // Exponential search: 100, 200, 400, 800, 1600, 3200...
91        while jump < 10000 {
92            let path = format!(
93                "/activitylist-service/activities/search/activities?limit=1&start={}",
94                jump
95            );
96
97            let activities: Vec<serde_json::Value> =
98                self.client.get_json(&self.token, &path).await?;
99
100            if activities.is_empty() {
101                break;
102            }
103
104            last_non_empty = jump;
105            jump *= 2;
106            self.rate_limiter.wait().await;
107        }
108
109        // Step 2: Binary search to find exact end
110        let mut low = last_non_empty;
111        let mut high = jump;
112
113        while high - low > limit {
114            let mid = (low + high) / 2;
115            let path = format!(
116                "/activitylist-service/activities/search/activities?limit=1&start={}",
117                mid
118            );
119
120            self.rate_limiter.wait().await;
121            let activities: Vec<serde_json::Value> =
122                self.client.get_json(&self.token, &path).await?;
123
124            if activities.is_empty() {
125                high = mid;
126            } else {
127                low = mid;
128            }
129        }
130
131        // Step 3: Fetch the last page to get the oldest activity
132        let path = format!(
133            "/activitylist-service/activities/search/activities?limit={}&start={}",
134            limit, low
135        );
136
137        self.rate_limiter.wait().await;
138        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
139
140        let oldest_date = activities
141            .last()
142            .and_then(|activity| activity.get("startTimeLocal"))
143            .and_then(|v| v.as_str())
144            .and_then(|date_str| date_str.split(' ').next())
145            .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
146
147        let result = oldest_date.unwrap_or_else(|| {
148            // Default to 1 year ago if no activities found
149            Utc::now().date_naive() - Duration::days(365)
150        });
151
152        println!(" {}", result);
153        Ok(result)
154    }
155
156    /// Run the sync process
157    pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
158        // For now, always use sequential mode as parallel requires more refactoring
159        // The TUI will be integrated in a future update
160        if opts.fancy_ui {
161            self.run_with_progress(&opts).await
162        } else {
163            self.run_sequential(&opts).await
164        }
165    }
166
167    /// Run sync with fancy progress tracking (TUI or simple)
168    async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
169        let progress = Arc::new(SyncProgress::new());
170
171        // Fetch display name early
172        let display_name = self.get_display_name().await?;
173        progress.set_profile(&display_name);
174
175        // Recover any crashed tasks
176        let _recovered = self.queue.recover_in_progress()?;
177
178        // Plan phase
179        if self.queue.pending_count()? == 0 {
180            self.plan_sync(opts).await?;
181        }
182
183        // Count tasks by type for progress tracking
184        self.count_tasks_for_progress(&progress)?;
185
186        // Determine date range for display
187        let from_date = opts
188            .from_date
189            .unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
190        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
191        progress.set_date_range(&from_date.to_string(), &to_date.to_string());
192
193        // Spawn TUI in background
194        let ui_progress = progress.clone();
195        let ui_handle = tokio::spawn(async move {
196            if let Err(e) = ui::run_tui(ui_progress).await {
197                eprintln!("TUI error: {}", e);
198            }
199        });
200
201        // Run sync with progress updates
202        let stats = self
203            .run_with_progress_tracking(opts, progress.clone())
204            .await?;
205
206        // Wait for TUI to finish
207        ui_handle.abort();
208
209        // Print final stats
210        println!("\nSync complete: {}", stats);
211
212        Ok(stats)
213    }
214
215    /// Run sync with progress tracking (no TUI)
216    async fn run_with_progress_tracking(
217        &mut self,
218        opts: &SyncOptions,
219        progress: SharedProgress,
220    ) -> Result<SyncStats> {
221        let mut stats = SyncStats::default();
222
223        // Execute phase: process tasks
224        while let Some(task) = self.queue.pop()? {
225            if self.rate_limiter.should_pause() {
226                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
227            }
228
229            let task_id = task.id.unwrap();
230            self.queue.mark_in_progress(task_id)?;
231
232            // Update progress for current task
233            update_progress_for_task(&task, &progress);
234
235            self.rate_limiter.wait().await;
236            progress.record_request();
237
238            match self.execute_task(&task).await {
239                Ok(()) => {
240                    self.queue.mark_completed(task_id)?;
241                    self.rate_limiter.on_success();
242                    stats.completed += 1;
243                    complete_progress_for_task(&task, &progress);
244                }
245                Err(GarminError::RateLimited) => {
246                    self.rate_limiter.on_rate_limit();
247                    let backoff = self.rate_limiter.current_backoff();
248                    self.queue.mark_failed(
249                        task_id,
250                        "Rate limited",
251                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
252                    )?;
253                    stats.rate_limited += 1;
254                }
255                Err(e) => {
256                    let backoff = Duration::seconds(60);
257                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
258                    stats.failed += 1;
259                    fail_progress_for_task(&task, &progress);
260                }
261            }
262
263            if opts.dry_run {
264                break;
265            }
266        }
267
268        // Cleanup old completed tasks
269        self.queue.cleanup(7)?;
270
271        Ok(stats)
272    }
273
274    /// Run sync sequentially (original behavior, simple output)
275    async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
276        let mut stats = SyncStats::default();
277
278        // Fetch display name early (needed for health endpoints)
279        print!("Fetching user profile...");
280        let _ = io::stdout().flush();
281        let display_name = self.get_display_name().await?;
282        println!(" {}", display_name);
283
284        // Recover any crashed tasks
285        let recovered = self.queue.recover_in_progress()?;
286        if recovered > 0 {
287            println!("  Recovered {} tasks from previous run", recovered);
288            stats.recovered = recovered;
289        }
290
291        // Plan phase: generate tasks if queue is empty
292        if self.queue.pending_count()? == 0 {
293            println!("Planning sync tasks...");
294            self.plan_sync(opts).await?;
295        }
296
297        let total_tasks = self.queue.pending_count()?;
298        println!("  {} tasks queued\n", total_tasks);
299
300        // Execute phase: process tasks
301        while let Some(task) = self.queue.pop()? {
302            if self.rate_limiter.should_pause() {
303                println!(
304                    "  Rate limited, pausing for {} seconds...",
305                    self.rate_limiter.pause_duration().as_secs()
306                );
307                tokio::time::sleep(self.rate_limiter.pause_duration()).await;
308            }
309
310            let task_id = task.id.unwrap();
311            self.queue.mark_in_progress(task_id)?;
312
313            // Print task description
314            print_task_status(&task, &stats);
315
316            self.rate_limiter.wait().await;
317
318            match self.execute_task(&task).await {
319                Ok(()) => {
320                    self.queue.mark_completed(task_id)?;
321                    self.rate_limiter.on_success();
322                    stats.completed += 1;
323                    println!(" done");
324                }
325                Err(GarminError::RateLimited) => {
326                    self.rate_limiter.on_rate_limit();
327                    let backoff = self.rate_limiter.current_backoff();
328                    self.queue.mark_failed(
329                        task_id,
330                        "Rate limited",
331                        Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
332                    )?;
333                    stats.rate_limited += 1;
334                    println!(" rate limited (retry in {}s)", backoff.as_secs());
335                }
336                Err(e) => {
337                    let backoff = Duration::seconds(60);
338                    self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
339                    stats.failed += 1;
340                    println!(" failed: {}", e);
341                }
342            }
343
344            if opts.dry_run {
345                break;
346            }
347        }
348
349        // Cleanup old completed tasks
350        self.queue.cleanup(7)?;
351
352        Ok(stats)
353    }
354
355    /// Count pending tasks by type and update progress
356    fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
357        let conn = self.db.connection();
358        let conn = conn.lock().unwrap();
359
360        // Count activities tasks
361        let act_count: i64 = conn
362            .query_row(
363                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
364                [],
365                |row| row.get(0),
366            )
367            .unwrap_or(0);
368        progress.activities.set_total(act_count as u32);
369
370        // Count GPX tasks
371        let gpx_count: i64 = conn
372            .query_row(
373                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
374                [],
375                |row| row.get(0),
376            )
377            .unwrap_or(0);
378        progress.gpx.set_total(gpx_count as u32);
379
380        // Count health tasks
381        let health_count: i64 = conn
382            .query_row(
383                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
384                [],
385                |row| row.get(0),
386            )
387            .unwrap_or(0);
388        progress.health.set_total(health_count as u32);
389
390        // Count performance tasks
391        let perf_count: i64 = conn
392            .query_row(
393                "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
394                [],
395                |row| row.get(0),
396            )
397            .unwrap_or(0);
398        progress.performance.set_total(perf_count as u32);
399
400        Ok(())
401    }
402
403    /// Plan sync tasks based on current state
404    async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
405        // Determine date range - auto-detect from oldest activity if not specified
406        let from_date = match opts.from_date {
407            Some(date) => date,
408            None => self.find_oldest_activity_date().await?,
409        };
410        let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
411
412        let total_days = (to_date - from_date).num_days();
413        println!(
414            "  Date range: {} to {} ({} days)",
415            from_date, to_date, total_days
416        );
417
418        // Plan activity sync
419        if opts.sync_activities {
420            println!("  Planning activity sync...");
421            self.plan_activities_sync()?;
422        }
423
424        // Plan health sync
425        if opts.sync_health {
426            let health_tasks = self.plan_health_sync(from_date, to_date)?;
427            println!("  Planning health sync: {} days to fetch", health_tasks);
428        }
429
430        // Plan performance sync
431        if opts.sync_performance {
432            let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
433            println!("  Planning performance sync: {} weeks to fetch", perf_tasks);
434        }
435
436        Ok(())
437    }
438
439    /// Plan activity sync tasks
440    fn plan_activities_sync(&self) -> Result<()> {
441        // Start with first page, we'll add more as we discover them
442        let task = SyncTask::new(
443            self.profile_id,
444            SyncTaskType::Activities {
445                start: 0,
446                limit: 50,
447            },
448        );
449        self.queue.push(task)?;
450        Ok(())
451    }
452
453    /// Plan health sync tasks for date range, returns count of tasks added
454    fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
455        let mut count = 0;
456        let mut date = from;
457        while date <= to {
458            // Check if we already have data for this date
459            if !self.has_health_data(date)? {
460                let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
461                self.queue.push(task)?;
462                count += 1;
463            }
464            date += Duration::days(1);
465        }
466        Ok(count)
467    }
468
469    /// Plan performance sync tasks, returns count of tasks added
470    fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
471        // Performance metrics don't change daily, sync weekly
472        let mut count = 0;
473        let mut date = from;
474        while date <= to {
475            if !self.has_performance_data(date)? {
476                let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
477                self.queue.push(task)?;
478                count += 1;
479            }
480            date += Duration::days(7);
481        }
482        Ok(count)
483    }
484
485    /// Check if health data exists for date
486    fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
487        let conn = self.db.connection();
488        let conn = conn.lock().unwrap();
489
490        let count: i64 = conn
491            .query_row(
492                "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
493                duckdb::params![self.profile_id, date.to_string()],
494                |row| row.get(0),
495            )
496            .map_err(|e| GarminError::Database(e.to_string()))?;
497
498        Ok(count > 0)
499    }
500
501    /// Check if performance data exists for date
502    fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
503        let conn = self.db.connection();
504        let conn = conn.lock().unwrap();
505
506        let count: i64 = conn
507            .query_row(
508                "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
509                duckdb::params![self.profile_id, date.to_string()],
510                |row| row.get(0),
511            )
512            .map_err(|e| GarminError::Database(e.to_string()))?;
513
514        Ok(count > 0)
515    }
516
517    /// Execute a single sync task
518    async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
519        match &task.task_type {
520            SyncTaskType::Activities { start, limit } => self.sync_activities(*start, *limit).await,
521            SyncTaskType::ActivityDetail { activity_id } => {
522                self.sync_activity_detail(*activity_id).await
523            }
524            SyncTaskType::DownloadGpx { activity_id, .. } => self.download_gpx(*activity_id).await,
525            SyncTaskType::DailyHealth { date } => self.sync_daily_health(*date).await,
526            SyncTaskType::Performance { date } => self.sync_performance(*date).await,
527            SyncTaskType::Weight { from, to } => self.sync_weight(*from, *to).await,
528            SyncTaskType::GenerateEmbeddings { activity_ids } => {
529                self.generate_embeddings(activity_ids).await
530            }
531        }
532    }
533
534    /// Sync activities list
535    async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
536        let path = format!(
537            "/activitylist-service/activities/search/activities?limit={}&start={}",
538            limit, start
539        );
540        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
541
542        for activity in &activities {
543            // Store activity summary
544            self.store_activity(activity)?;
545
546            // Queue GPX download for activities with GPS
547            if activity
548                .get("hasPolyline")
549                .and_then(|v| v.as_bool())
550                .unwrap_or(false)
551            {
552                if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
553                    let activity_name = activity
554                        .get("activityName")
555                        .and_then(|v| v.as_str())
556                        .map(|s| s.to_string());
557                    let activity_date = activity
558                        .get("startTimeLocal")
559                        .and_then(|v| v.as_str())
560                        .and_then(|s| s.split(' ').next())
561                        .map(|s| s.to_string());
562
563                    let task = SyncTask::new(
564                        self.profile_id,
565                        SyncTaskType::DownloadGpx {
566                            activity_id: id,
567                            activity_name,
568                            activity_date,
569                        },
570                    );
571                    self.queue.push(task)?;
572                }
573            }
574        }
575
576        // If we got a full page, there might be more
577        if activities.len() == limit as usize {
578            let task = SyncTask::new(
579                self.profile_id,
580                SyncTaskType::Activities {
581                    start: start + limit,
582                    limit,
583                },
584            );
585            self.queue.push(task)?;
586        }
587
588        Ok(())
589    }
590
591    /// Store an activity in the database
592    fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
593        let conn = self.db.connection();
594        let conn = conn.lock().unwrap();
595
596        let activity_id = activity
597            .get("activityId")
598            .and_then(|v| v.as_i64())
599            .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
600
601        conn.execute(
602            "INSERT INTO activities (
603                activity_id, profile_id, activity_name, activity_type,
604                start_time_local, duration_sec, distance_m, calories,
605                avg_hr, max_hr, avg_speed, max_speed,
606                elevation_gain, elevation_loss, raw_json
607            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
608            ON CONFLICT (activity_id) DO UPDATE SET
609                activity_name = EXCLUDED.activity_name,
610                activity_type = EXCLUDED.activity_type,
611                raw_json = EXCLUDED.raw_json",
612            duckdb::params![
613                activity_id,
614                self.profile_id,
615                activity.get("activityName").and_then(|v| v.as_str()),
616                activity
617                    .get("activityType")
618                    .and_then(|v| v.get("typeKey"))
619                    .and_then(|v| v.as_str()),
620                activity.get("startTimeLocal").and_then(|v| v.as_str()),
621                activity.get("duration").and_then(|v| v.as_f64()),
622                activity.get("distance").and_then(|v| v.as_f64()),
623                activity
624                    .get("calories")
625                    .and_then(|v| v.as_i64())
626                    .map(|v| v as i32),
627                activity
628                    .get("averageHR")
629                    .and_then(|v| v.as_i64())
630                    .map(|v| v as i32),
631                activity
632                    .get("maxHR")
633                    .and_then(|v| v.as_i64())
634                    .map(|v| v as i32),
635                activity.get("averageSpeed").and_then(|v| v.as_f64()),
636                activity.get("maxSpeed").and_then(|v| v.as_f64()),
637                activity.get("elevationGain").and_then(|v| v.as_f64()),
638                activity.get("elevationLoss").and_then(|v| v.as_f64()),
639                activity.to_string(),
640            ],
641        )
642        .map_err(|e| GarminError::Database(e.to_string()))?;
643
644        Ok(())
645    }
646
647    /// Sync activity detail (not implemented yet)
648    async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
649        // TODO: Fetch detailed activity data
650        Ok(())
651    }
652
653    /// Download and parse GPX
654    async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
655        let path = format!("/download-service/export/gpx/activity/{}", activity_id);
656        let gpx_bytes = self.client.download(&self.token, &path).await?;
657        let gpx_data = String::from_utf8_lossy(&gpx_bytes);
658        self.parse_and_store_gpx(activity_id, &gpx_data)?;
659        Ok(())
660    }
661
662    /// Parse GPX and store track points
663    fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
664        use gpx::read;
665        use std::io::BufReader;
666
667        let reader = BufReader::new(gpx_data.as_bytes());
668        let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
669
670        let conn = self.db.connection();
671        let conn = conn.lock().unwrap();
672
673        for track in gpx.tracks {
674            for segment in track.segments {
675                for point in segment.points {
676                    let timestamp = point
677                        .time
678                        .map(|t| t.format().unwrap_or_default())
679                        .unwrap_or_default();
680
681                    conn.execute(
682                        "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
683                         VALUES (?, ?, ?, ?, ?)",
684                        duckdb::params![
685                            activity_id,
686                            timestamp,
687                            point.point().y(),
688                            point.point().x(),
689                            point.elevation,
690                        ],
691                    )
692                    .map_err(|e| GarminError::Database(e.to_string()))?;
693                }
694            }
695        }
696
697        Ok(())
698    }
699
700    /// Sync daily health data
701    async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
702        // Get user's display name for the endpoint
703        let display_name = self.get_display_name().await?;
704
705        let path = format!(
706            "/usersummary-service/usersummary/daily/{}?calendarDate={}",
707            display_name, date
708        );
709
710        // Try to fetch health data - may return 404/error for dates without data
711        let health_result: std::result::Result<serde_json::Value, _> =
712            self.client.get_json(&self.token, &path).await;
713
714        let health = match health_result {
715            Ok(data) => data,
716            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
717                // No data for this date - store empty record to mark as synced
718                // This prevents re-fetching dates that never had data (before device ownership, gaps, etc.)
719                serde_json::json!({})
720            }
721            Err(e) => return Err(e),
722        };
723
724        let conn = self.db.connection();
725        let conn = conn.lock().unwrap();
726
727        conn.execute(
728            "INSERT INTO daily_health (
729                profile_id, date, steps, step_goal, total_calories, active_calories,
730                resting_hr, sleep_seconds, avg_stress, max_stress,
731                body_battery_start, body_battery_end, raw_json
732            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
733            ON CONFLICT (profile_id, date) DO UPDATE SET
734                steps = EXCLUDED.steps,
735                step_goal = EXCLUDED.step_goal,
736                total_calories = EXCLUDED.total_calories,
737                active_calories = EXCLUDED.active_calories,
738                resting_hr = EXCLUDED.resting_hr,
739                sleep_seconds = EXCLUDED.sleep_seconds,
740                avg_stress = EXCLUDED.avg_stress,
741                max_stress = EXCLUDED.max_stress,
742                body_battery_start = EXCLUDED.body_battery_start,
743                body_battery_end = EXCLUDED.body_battery_end,
744                raw_json = EXCLUDED.raw_json",
745            duckdb::params![
746                self.profile_id,
747                date.to_string(),
748                health
749                    .get("totalSteps")
750                    .and_then(|v| v.as_i64())
751                    .map(|v| v as i32),
752                health
753                    .get("dailyStepGoal")
754                    .and_then(|v| v.as_i64())
755                    .map(|v| v as i32),
756                health
757                    .get("totalKilocalories")
758                    .and_then(|v| v.as_i64())
759                    .map(|v| v as i32),
760                health
761                    .get("activeKilocalories")
762                    .and_then(|v| v.as_i64())
763                    .map(|v| v as i32),
764                health
765                    .get("restingHeartRate")
766                    .and_then(|v| v.as_i64())
767                    .map(|v| v as i32),
768                health
769                    .get("sleepingSeconds")
770                    .and_then(|v| v.as_i64())
771                    .map(|v| v as i32),
772                health
773                    .get("averageStressLevel")
774                    .and_then(|v| v.as_i64())
775                    .map(|v| v as i32),
776                health
777                    .get("maxStressLevel")
778                    .and_then(|v| v.as_i64())
779                    .map(|v| v as i32),
780                health
781                    .get("bodyBatteryChargedValue")
782                    .and_then(|v| v.as_i64())
783                    .map(|v| v as i32),
784                health
785                    .get("bodyBatteryDrainedValue")
786                    .and_then(|v| v.as_i64())
787                    .map(|v| v as i32),
788                health.to_string(),
789            ],
790        )
791        .map_err(|e| GarminError::Database(e.to_string()))?;
792
793        Ok(())
794    }
795
796    /// Sync performance metrics
797    async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
798        // Fetch VO2 max
799        let vo2max: Option<serde_json::Value> = self
800            .client
801            .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
802            .await
803            .ok();
804
805        // Fetch race predictions
806        let race_predictions: Option<serde_json::Value> = self
807            .client
808            .get_json(
809                &self.token,
810                "/metrics-service/metrics/racepredictions/latest",
811            )
812            .await
813            .ok();
814
815        // Fetch training readiness
816        let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
817        let training_readiness: Option<serde_json::Value> = self
818            .client
819            .get_json(&self.token, &readiness_path)
820            .await
821            .ok();
822
823        // Fetch training status (includes load data)
824        let status_path = format!(
825            "/metrics-service/metrics/trainingstatus/aggregated/{}",
826            date
827        );
828        let training_status: Option<serde_json::Value> =
829            self.client.get_json(&self.token, &status_path).await.ok();
830
831        let conn = self.db.connection();
832        let conn = conn.lock().unwrap();
833
834        let vo2max_value = vo2max
835            .as_ref()
836            .and_then(|v| v.get("generic"))
837            .and_then(|v| v.get("vo2MaxValue"))
838            .and_then(|v| v.as_f64());
839
840        let fitness_age = vo2max
841            .as_ref()
842            .and_then(|v| v.get("generic"))
843            .and_then(|v| v.get("fitnessAge"))
844            .and_then(|v| v.as_i64())
845            .map(|v| v as i32);
846
847        // Extract training readiness
848        let readiness_entry = training_readiness
849            .as_ref()
850            .and_then(|v| v.as_array())
851            .and_then(|arr| arr.first());
852
853        let readiness_score = readiness_entry
854            .and_then(|e| e.get("score"))
855            .and_then(|v| v.as_i64())
856            .map(|v| v as i32);
857
858        let readiness_level = readiness_entry
859            .and_then(|e| e.get("level"))
860            .and_then(|v| v.as_str())
861            .map(|s| s.to_string());
862
863        // Extract training status from nested structure
864        let status_data = training_status
865            .as_ref()
866            .and_then(|v| v.get("mostRecentTrainingStatus"))
867            .and_then(|s| s.get("latestTrainingStatusData"))
868            .and_then(|d| d.as_object())
869            .and_then(|m| m.values().next());
870
871        let status_phrase = status_data
872            .and_then(|e| e.get("trainingStatusFeedbackPhrase"))
873            .and_then(|v| v.as_str())
874            .map(|s| s.to_string());
875
876        let load_dto = status_data.and_then(|e| e.get("acuteTrainingLoadDTO"));
877
878        let acute_load = load_dto
879            .and_then(|l| l.get("dailyTrainingLoadAcute"))
880            .and_then(|v| v.as_f64());
881
882        let chronic_load = load_dto
883            .and_then(|l| l.get("dailyTrainingLoadChronic"))
884            .and_then(|v| v.as_f64());
885
886        let load_ratio = load_dto
887            .and_then(|l| l.get("dailyAcuteChronicWorkloadRatio"))
888            .and_then(|v| v.as_f64());
889
890        let load_ratio_status = load_dto
891            .and_then(|l| l.get("acwrStatus"))
892            .and_then(|v| v.as_str())
893            .map(|s| s.to_string());
894
895        // Extract load focus
896        let load_focus = training_status
897            .as_ref()
898            .and_then(|v| v.get("mostRecentTrainingLoadBalance"))
899            .and_then(|b| b.get("metricsTrainingLoadBalanceDTOMap"))
900            .and_then(|m| m.as_object())
901            .and_then(|m| m.values().next())
902            .and_then(|e| e.get("trainingBalanceFeedbackPhrase"))
903            .and_then(|v| v.as_str())
904            .map(|s| s.to_string());
905
906        let race_5k = race_predictions
907            .as_ref()
908            .and_then(|v| v.get("time5K"))
909            .and_then(|v| v.as_f64())
910            .map(|v| v as i32);
911
912        let race_10k = race_predictions
913            .as_ref()
914            .and_then(|v| v.get("time10K"))
915            .and_then(|v| v.as_f64())
916            .map(|v| v as i32);
917
918        let race_half = race_predictions
919            .as_ref()
920            .and_then(|v| v.get("timeHalfMarathon"))
921            .and_then(|v| v.as_f64())
922            .map(|v| v as i32);
923
924        let race_marathon = race_predictions
925            .as_ref()
926            .and_then(|v| v.get("timeMarathon"))
927            .and_then(|v| v.as_f64())
928            .map(|v| v as i32);
929
930        conn.execute(
931            "INSERT INTO performance_metrics (
932                profile_id, date, vo2max, fitness_age,
933                training_readiness, training_readiness_level,
934                training_status, acute_load, chronic_load,
935                load_ratio, load_ratio_status, load_focus,
936                race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
937            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
938            ON CONFLICT (profile_id, date) DO UPDATE SET
939                vo2max = EXCLUDED.vo2max,
940                fitness_age = EXCLUDED.fitness_age,
941                training_readiness = EXCLUDED.training_readiness,
942                training_readiness_level = EXCLUDED.training_readiness_level,
943                training_status = EXCLUDED.training_status,
944                acute_load = EXCLUDED.acute_load,
945                chronic_load = EXCLUDED.chronic_load,
946                load_ratio = EXCLUDED.load_ratio,
947                load_ratio_status = EXCLUDED.load_ratio_status,
948                load_focus = EXCLUDED.load_focus,
949                race_5k_sec = EXCLUDED.race_5k_sec,
950                race_10k_sec = EXCLUDED.race_10k_sec,
951                race_half_sec = EXCLUDED.race_half_sec,
952                race_marathon_sec = EXCLUDED.race_marathon_sec",
953            duckdb::params![
954                self.profile_id,
955                date.to_string(),
956                vo2max_value,
957                fitness_age,
958                readiness_score,
959                readiness_level,
960                status_phrase,
961                acute_load,
962                chronic_load,
963                load_ratio,
964                load_ratio_status,
965                load_focus,
966                race_5k,
967                race_10k,
968                race_half,
969                race_marathon,
970            ],
971        )
972        .map_err(|e| GarminError::Database(e.to_string()))?;
973
974        Ok(())
975    }
976
977    /// Sync weight data
978    async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
979        // TODO: Implement weight sync
980        Ok(())
981    }
982
983    /// Generate embeddings for activities
984    async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
985        // TODO: Implement embedding generation using fastembed
986        Ok(())
987    }
988}
989
990/// Options for sync operation
991#[derive(Default)]
992pub struct SyncOptions {
993    /// Sync activities
994    pub sync_activities: bool,
995    /// Sync daily health
996    pub sync_health: bool,
997    /// Sync performance metrics
998    pub sync_performance: bool,
999    /// Start date for sync
1000    pub from_date: Option<NaiveDate>,
1001    /// End date for sync
1002    pub to_date: Option<NaiveDate>,
1003    /// Dry run (plan only, don't execute)
1004    pub dry_run: bool,
1005    /// Force re-sync (ignore existing data)
1006    pub force: bool,
1007    /// Use fancy TUI (default: true, set false for --simple)
1008    pub fancy_ui: bool,
1009    /// Number of concurrent API requests (default: 3)
1010    pub concurrency: usize,
1011}
1012
1013impl SyncOptions {
1014    /// Create options for full sync
1015    pub fn full() -> Self {
1016        Self {
1017            sync_activities: true,
1018            sync_health: true,
1019            sync_performance: true,
1020            fancy_ui: true,
1021            concurrency: 3,
1022            ..Default::default()
1023        }
1024    }
1025
1026    /// Create options for simple (non-TUI) mode
1027    pub fn simple() -> Self {
1028        Self {
1029            sync_activities: true,
1030            sync_health: true,
1031            sync_performance: true,
1032            fancy_ui: false,
1033            concurrency: 3,
1034            ..Default::default()
1035        }
1036    }
1037}
1038
1039/// Statistics from sync operation
1040#[derive(Default)]
1041pub struct SyncStats {
1042    /// Tasks recovered from previous run
1043    pub recovered: u32,
1044    /// Tasks completed successfully
1045    pub completed: u32,
1046    /// Tasks that hit rate limits
1047    pub rate_limited: u32,
1048    /// Tasks that failed
1049    pub failed: u32,
1050}
1051
1052impl std::fmt::Display for SyncStats {
1053    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054        write!(
1055            f,
1056            "Completed: {}, Failed: {}, Rate limited: {}",
1057            self.completed, self.failed, self.rate_limited
1058        )?;
1059        if self.recovered > 0 {
1060            write!(f, ", Recovered: {}", self.recovered)?;
1061        }
1062        Ok(())
1063    }
1064}
1065
1066/// Print task status with nice formatting
1067fn print_task_status(task: &SyncTask, stats: &SyncStats) {
1068    let desc = match &task.task_type {
1069        SyncTaskType::Activities { start, limit } => {
1070            format!("Fetching activities {}-{}", start, start + limit)
1071        }
1072        SyncTaskType::ActivityDetail { activity_id } => {
1073            format!("Activity {} details", activity_id)
1074        }
1075        SyncTaskType::DownloadGpx {
1076            activity_id,
1077            activity_name,
1078            activity_date,
1079        } => {
1080            let name = activity_name.as_deref().unwrap_or("Unknown");
1081            let date = activity_date.as_deref().unwrap_or("");
1082            if date.is_empty() {
1083                format!("GPX: {} ({})", name, activity_id)
1084            } else {
1085                format!("GPX: {} {} ({})", date, name, activity_id)
1086            }
1087        }
1088        SyncTaskType::DailyHealth { date } => {
1089            format!("Health data for {}", date)
1090        }
1091        SyncTaskType::Performance { date } => {
1092            format!("Performance metrics for {}", date)
1093        }
1094        SyncTaskType::Weight { from, to } => {
1095            format!("Weight {} to {}", from, to)
1096        }
1097        SyncTaskType::GenerateEmbeddings { activity_ids } => {
1098            format!(
1099                "Generating embeddings for {} activities",
1100                activity_ids.len()
1101            )
1102        }
1103    };
1104
1105    print!("[{:>4}] {}...", stats.completed + 1, desc);
1106    let _ = io::stdout().flush();
1107}
1108
1109/// Update progress when starting a task
1110fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1111    let desc = match &task.task_type {
1112        SyncTaskType::Activities { start, limit } => {
1113            format!("Activities {}-{}", start, start + limit)
1114        }
1115        SyncTaskType::DownloadGpx {
1116            activity_name,
1117            activity_date,
1118            ..
1119        } => {
1120            let name = activity_name.as_deref().unwrap_or("Unknown");
1121            let date = activity_date.as_deref().unwrap_or("");
1122            if date.is_empty() {
1123                name.to_string()
1124            } else {
1125                format!("{} {}", date, name)
1126            }
1127        }
1128        SyncTaskType::DailyHealth { date } => date.to_string(),
1129        SyncTaskType::Performance { date } => date.to_string(),
1130        _ => String::new(),
1131    };
1132
1133    match &task.task_type {
1134        SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
1135        SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
1136        SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
1137        SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
1138        _ => {}
1139    }
1140}
1141
1142/// Mark a task as completed in progress
1143fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1144    match &task.task_type {
1145        SyncTaskType::Activities { .. } => {
1146            progress.activities.complete_one();
1147            // Activities can spawn GPX downloads, so update GPX total
1148            // This is handled dynamically when GPX tasks are added
1149        }
1150        SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1151        SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1152        SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1153        _ => {}
1154    }
1155}
1156
1157/// Mark a task as failed in progress
1158fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1159    match &task.task_type {
1160        SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1161        SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1162        SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1163        SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1164        _ => {}
1165    }
1166}