Skip to main content

garmin_cli/sync/
mod.rs

1//! Sync module for Garmin data synchronization
2//!
3//! Provides:
4//! - Rate-limited API access with parallel streams
5//! - Persistent task queue for crash recovery (SQLite)
6//! - Incremental sync with gap detection
7//! - GPX parsing for track points
8//! - Parquet storage for concurrent read access
9//! - Producer/consumer pipeline for concurrent fetching and writing
10//! - Plain terminal progress output
11
12pub mod progress;
13pub mod rate_limiter;
14pub mod task_queue;
15
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
20use tokio::sync::{mpsc, Mutex as TokioMutex};
21
22use crate::client::{GarminClient, OAuth2Token};
23use crate::db::models::{
24    Activity, DailyHealth, PerformanceMetrics, SyncPipeline, SyncTask, SyncTaskType, TrackPoint,
25};
26use crate::storage::{ParquetStore, Storage, SyncDb};
27use crate::{GarminError, Result};
28use std::io::{self, Write};
29
30pub use progress::{PlanningStep, SharedProgress, SyncProgress};
31pub use rate_limiter::{RateLimiter, SharedRateLimiter};
32pub use task_queue::{SharedTaskQueue, TaskQueue};
33
34fn pipeline_filter(mode: progress::SyncMode) -> Option<SyncPipeline> {
35    match mode {
36        progress::SyncMode::Latest => Some(SyncPipeline::Frontier),
37        progress::SyncMode::Backfill => Some(SyncPipeline::Backfill),
38    }
39}
40
41/// Data produced by API fetchers, consumed by Parquet writers
42#[derive(Debug)]
43enum SyncData {
44    /// Activity list with parsed activities and potential follow-up tasks
45    Activities {
46        records: Vec<Activity>,
47        gpx_tasks: Vec<SyncTask>,
48        next_page: Option<SyncTask>,
49        task_id: i64,
50    },
51    /// Daily health record
52    Health { record: DailyHealth, task_id: i64 },
53    /// Performance metrics record
54    Performance {
55        record: PerformanceMetrics,
56        task_id: i64,
57    },
58    /// Track points from GPX
59    TrackPoints {
60        #[allow(dead_code)]
61        activity_id: i64,
62        date: NaiveDate,
63        points: Vec<TrackPoint>,
64        task_id: i64,
65    },
66}
67
68/// Sync engine for orchestrating data synchronization
69pub struct SyncEngine {
70    storage: Storage,
71    client: GarminClient,
72    token: OAuth2Token,
73    rate_limiter: RateLimiter,
74    queue: TaskQueue,
75    profile_id: i32,
76    display_name: Option<String>,
77}
78
79#[derive(Clone)]
80struct ProducerContext {
81    rate_limiter: SharedRateLimiter,
82    client: GarminClient,
83    token: OAuth2Token,
84    progress: SharedProgress,
85    display_name: Arc<String>,
86    profile_id: i32,
87    stats: Arc<TokioMutex<SyncStats>>,
88    in_flight: Arc<AtomicUsize>,
89    parquet: Arc<ParquetStore>,
90    force: bool,
91    pipeline_filter: Option<SyncPipeline>,
92}
93
94type SleepMetrics = (
95    Option<i32>,
96    Option<i32>,
97    Option<i32>,
98    Option<i32>,
99    Option<i32>,
100);
101
102impl SyncEngine {
103    /// Create a new sync engine with default storage location
104    pub fn new(client: GarminClient, token: OAuth2Token) -> Result<Self> {
105        let storage = Storage::open_default()?;
106        Self::with_storage(storage, client, token)
107    }
108
109    /// Create a new sync engine with custom storage
110    pub fn with_storage(
111        storage: Storage,
112        client: GarminClient,
113        token: OAuth2Token,
114    ) -> Result<Self> {
115        // Get or create profile (will be updated with display name after API call)
116        let profile_id = storage.sync_db.get_or_create_profile("default")?;
117
118        // Create task queue using the sync database
119        let sync_db = SyncDb::open(storage.base_path().join("sync.db"))?;
120        let queue = TaskQueue::new(sync_db, profile_id, None);
121
122        Ok(Self {
123            storage,
124            client,
125            token,
126            rate_limiter: RateLimiter::new(),
127            queue,
128            profile_id,
129            display_name: None,
130        })
131    }
132
133    /// Fetch and cache the user's display name
134    async fn get_display_name(&mut self) -> Result<String> {
135        if let Some(ref name) = self.display_name {
136            return Ok(name.clone());
137        }
138
139        let profile: serde_json::Value = self
140            .client
141            .get_json(&self.token, "/userprofile-service/socialProfile")
142            .await?;
143
144        let name = profile
145            .get("displayName")
146            .and_then(|v| v.as_str())
147            .map(|s| s.to_string())
148            .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
149
150        // Update profile in database
151        self.profile_id = self.storage.sync_db.get_or_create_profile(&name)?;
152        self.queue.set_profile_id(self.profile_id);
153
154        self.display_name = Some(name.clone());
155        Ok(name)
156    }
157
158    /// Find the oldest activity date and total count by querying the activities API
159    /// Returns (oldest_date, total_activities, activities_with_gps)
160    pub(crate) async fn find_oldest_activity_date(
161        &mut self,
162        progress: Option<&SyncProgress>,
163    ) -> Result<(NaiveDate, u32, u32)> {
164        if let Some(p) = progress {
165            p.set_planning_step(PlanningStep::FindingOldestActivity);
166        } else {
167            print!("Finding oldest activity date...");
168            let _ = io::stdout().flush();
169        }
170
171        // The API returns activities sorted by date descending (newest first)
172        // Use exponential search to find the end quickly, then fetch the last page
173
174        self.rate_limiter.wait().await;
175
176        // Step 1: Find approximate total count using exponential jumps
177        let limit: u32 = 100;
178        let mut jump: u32 = 100;
179        let mut last_non_empty: u32 = 0;
180        let max_jump: u32 = 1_000_000;
181
182        // Exponential search: 100, 200, 400, 800, 1600, 3200...
183        while jump < max_jump {
184            let path = format!(
185                "/activitylist-service/activities/search/activities?limit=1&start={}",
186                jump
187            );
188
189            let activities: Vec<serde_json::Value> =
190                self.client.get_json(&self.token, &path).await?;
191
192            if activities.is_empty() {
193                break;
194            }
195
196            last_non_empty = jump;
197            jump = jump.saturating_mul(2);
198            self.rate_limiter.wait().await;
199        }
200
201        // Step 2: Binary search to find exact end
202        let mut low = last_non_empty;
203        let mut high = jump;
204
205        while high - low > limit {
206            let mid = (low + high) / 2;
207            let path = format!(
208                "/activitylist-service/activities/search/activities?limit=1&start={}",
209                mid
210            );
211
212            self.rate_limiter.wait().await;
213            let activities: Vec<serde_json::Value> =
214                self.client.get_json(&self.token, &path).await?;
215
216            if activities.is_empty() {
217                high = mid;
218            } else {
219                low = mid;
220            }
221        }
222
223        // Step 3: Fetch the last page to get the oldest activity
224        let path = format!(
225            "/activitylist-service/activities/search/activities?limit={}&start={}",
226            limit, low
227        );
228
229        self.rate_limiter.wait().await;
230        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
231
232        // Calculate total activities
233        let total_activities = low + activities.len() as u32;
234
235        let oldest_date = activities
236            .last()
237            .and_then(|activity| activity.get("startTimeLocal"))
238            .and_then(|v| v.as_str())
239            .and_then(|date_str| date_str.split(' ').next())
240            .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
241
242        let result = oldest_date.unwrap_or_else(|| {
243            // Default to 1 year ago if no activities found
244            Utc::now().date_naive() - Duration::days(365)
245        });
246
247        // Estimate activities with GPS (typically ~80% of activities have GPS)
248        // This is a rough estimate - actual count will be refined during sync
249        let estimated_gps = (total_activities as f32 * 0.8) as u32;
250
251        if let Some(p) = progress {
252            p.set_oldest_activity_date(&result.to_string());
253        } else {
254            println!(" {} ({} activities)", result, total_activities);
255        }
256        Ok((result, total_activities, estimated_gps))
257    }
258
259    async fn has_health_data(&mut self, display_name: &str, date: NaiveDate) -> Result<bool> {
260        let path = format!(
261            "/usersummary-service/usersummary/daily/{}?calendarDate={}",
262            display_name, date
263        );
264
265        self.rate_limiter.wait().await;
266        let result: std::result::Result<serde_json::Value, _> =
267            self.client.get_json(&self.token, &path).await;
268
269        match result {
270            Ok(data) => Ok(!data.as_object().map(|o| o.is_empty()).unwrap_or(true)),
271            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
272            Err(e) => Err(e),
273        }
274    }
275
276    async fn find_first_health_date(
277        &mut self,
278        progress: Option<&SyncProgress>,
279        from: NaiveDate,
280        to: NaiveDate,
281    ) -> Result<Option<NaiveDate>> {
282        if let Some(p) = progress {
283            p.set_planning_step(PlanningStep::FindingFirstHealth);
284        }
285
286        if from > to {
287            return Ok(None);
288        }
289
290        let display_name = self.get_display_name().await?;
291        let mut low = from;
292        let mut high = to;
293        let mut found = None;
294
295        while low <= high {
296            let span = (high - low).num_days();
297            let mid = low + Duration::days(span / 2);
298
299            if self.has_health_data(&display_name, mid).await? {
300                found = Some(mid);
301                if mid == low {
302                    break;
303                }
304                high = mid - Duration::days(1);
305            } else {
306                low = mid + Duration::days(1);
307            }
308        }
309
310        Ok(found)
311    }
312
313    async fn has_performance_data(&mut self, date: NaiveDate) -> Result<bool> {
314        let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
315        self.rate_limiter.wait().await;
316        let readiness: std::result::Result<serde_json::Value, _> =
317            self.client.get_json(&self.token, &readiness_path).await;
318
319        if let Ok(data) = readiness {
320            if data.as_array().and_then(|arr| arr.first()).is_some() {
321                return Ok(true);
322            }
323        }
324
325        let status_path = format!(
326            "/metrics-service/metrics/trainingstatus/aggregated/{}",
327            date
328        );
329        self.rate_limiter.wait().await;
330        let status: std::result::Result<serde_json::Value, _> =
331            self.client.get_json(&self.token, &status_path).await;
332
333        match status {
334            Ok(data) => Ok(data.get("mostRecentTrainingStatus").is_some()),
335            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
336            Err(e) => Err(e),
337        }
338    }
339
340    async fn find_first_performance_date(
341        &mut self,
342        progress: Option<&SyncProgress>,
343        from: NaiveDate,
344        to: NaiveDate,
345    ) -> Result<Option<NaiveDate>> {
346        if let Some(p) = progress {
347            p.set_planning_step(PlanningStep::FindingFirstPerformance);
348        }
349
350        if from > to {
351            return Ok(None);
352        }
353
354        let mut low = from;
355        let mut high = to;
356        let mut found = None;
357
358        while low <= high {
359            let span = (high - low).num_days();
360            let mid = low + Duration::days(span / 2);
361
362            if self.has_performance_data(mid).await? {
363                found = Some(mid);
364                if mid == low {
365                    break;
366                }
367                high = mid - Duration::days(1);
368            } else {
369                low = mid + Duration::days(1);
370            }
371        }
372
373        Ok(found)
374    }
375
376    /// Run the sync process
377    pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
378        self.run_with_progress(&opts).await
379    }
380
381    /// Run sync with plain terminal progress reporting.
382    async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
383        let progress = Arc::new(SyncProgress::new());
384
385        // Set storage path and sync mode for display
386        progress.set_storage_path(&self.storage.base_path().display().to_string());
387        progress.set_sync_mode(opts.mode);
388        println!("Using storage: {}", progress.get_storage_path());
389        println!("Planning sync...");
390
391        // Planning phase - updates progress instead of printing
392        progress.set_planning_step(PlanningStep::FetchingProfile);
393        let display_name = self.get_display_name().await?;
394        progress.set_profile(&display_name);
395        self.queue.set_pipeline(pipeline_filter(opts.mode));
396
397        // Recover any crashed tasks
398        let _recovered = self.queue.recover_in_progress()?;
399
400        // Plan phase
401        if self.queue.pending_count()? == 0 {
402            self.plan_sync_with_progress(opts, &progress).await?;
403        }
404
405        // Mark planning complete
406        progress.finish_planning();
407
408        // Count tasks by type for progress tracking
409        self.count_tasks_for_progress(&progress)?;
410
411        // Set date ranges based on sync mode
412        let today = Utc::now().date_naive();
413        let to_date = opts.to_date.unwrap_or(today);
414
415        match opts.mode {
416            progress::SyncMode::Latest => {
417                // Latest mode: sync from last sync date (or 7 days ago) to today
418                let from_date = opts.from_date.unwrap_or_else(|| today - Duration::days(7));
419                progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
420                progress.set_date_range(&from_date.to_string(), &to_date.to_string());
421            }
422            progress::SyncMode::Backfill => {
423                // Backfill mode: sync from oldest activity backwards
424                let oldest = progress
425                    .get_oldest_activity_date()
426                    .unwrap_or_else(|| (today - Duration::days(365)).to_string());
427                let from_date = opts
428                    .from_date
429                    .map(|d| d.to_string())
430                    .unwrap_or(oldest.clone());
431                progress.set_backfill_range(&from_date, &oldest);
432                progress.set_date_range(&oldest, &from_date);
433            }
434        }
435
436        print_sync_overview(&progress);
437        println!("Planning complete.");
438
439        let stats_result = if opts.dry_run {
440            println!("Dry run mode - no changes will be made");
441            Ok(SyncStats::default())
442        } else {
443            let reporter_progress = progress.clone();
444            let reporter_handle = tokio::spawn(async move {
445                run_progress_reporter(reporter_progress).await;
446            });
447
448            let result = self
449                .run_with_progress_tracking(opts, progress.clone())
450                .await;
451            progress.request_shutdown();
452            let _ = reporter_handle.await;
453            result
454        };
455
456        let stats = stats_result?;
457        print_sync_errors(&progress);
458
459        // Update sync state after successful completion
460        if !opts.dry_run && stats.completed > 0 {
461            self.update_sync_state_after_completion(opts, today).await?;
462        }
463
464        if !opts.dry_run {
465            println!("\nSync complete: {}", stats);
466        }
467        Ok(stats)
468    }
469
470    /// Update sync state after successful sync completion
471    async fn update_sync_state_after_completion(
472        &self,
473        opts: &SyncOptions,
474        today: NaiveDate,
475    ) -> Result<()> {
476        use crate::db::models::SyncState;
477
478        match opts.mode {
479            progress::SyncMode::Latest => {
480                // Update last_sync_date to today
481                if opts.sync_activities {
482                    let state = SyncState {
483                        profile_id: self.profile_id,
484                        data_type: "activities".to_string(),
485                        last_sync_date: Some(today),
486                        last_activity_id: None,
487                    };
488                    self.storage.sync_db.update_sync_state(&state)?;
489                }
490                if opts.sync_health {
491                    let state = SyncState {
492                        profile_id: self.profile_id,
493                        data_type: "health".to_string(),
494                        last_sync_date: Some(today),
495                        last_activity_id: None,
496                    };
497                    self.storage.sync_db.update_sync_state(&state)?;
498                }
499            }
500            progress::SyncMode::Backfill => {
501                // Check if backfill is complete (all tasks done)
502                let pending = self.queue.pending_count()?;
503                if pending == 0 {
504                    // Mark backfill as complete
505                    self.storage
506                        .sync_db
507                        .mark_backfill_complete(self.profile_id, "activities")?;
508                    self.storage
509                        .sync_db
510                        .mark_backfill_complete(self.profile_id, "health")?;
511                    self.storage
512                        .sync_db
513                        .mark_backfill_complete(self.profile_id, "performance")?;
514                }
515            }
516        }
517
518        Ok(())
519    }
520
521    /// Run sync with progress tracking using parallel producer/consumer pipeline
522    async fn run_with_progress_tracking(
523        &mut self,
524        opts: &SyncOptions,
525        progress: SharedProgress,
526    ) -> Result<SyncStats> {
527        // Use parallel execution with producer/consumer pipeline
528        self.run_parallel(opts, progress).await
529    }
530
531    /// Run parallel sync with producer/consumer pipeline
532    ///
533    /// Producers: Fetch data from Garmin API (rate-limited)
534    /// Consumers: Write data to Parquet (partition-locked)
535    async fn run_parallel(
536        &mut self,
537        opts: &SyncOptions,
538        progress: SharedProgress,
539    ) -> Result<SyncStats> {
540        let rate_limiter = SharedRateLimiter::new(opts.concurrency);
541        let queue = SharedTaskQueue::new(TaskQueue::new(
542            SyncDb::open(self.storage.base_path().join("sync.db"))?,
543            self.profile_id,
544            pipeline_filter(opts.mode),
545        ));
546
547        self.run_parallel_with_resources(
548            opts,
549            progress,
550            queue,
551            rate_limiter,
552            pipeline_filter(opts.mode),
553        )
554        .await
555    }
556
557    pub(crate) async fn run_parallel_with_resources(
558        &mut self,
559        opts: &SyncOptions,
560        progress: SharedProgress,
561        queue: SharedTaskQueue,
562        rate_limiter: SharedRateLimiter,
563        pipeline_filter: Option<SyncPipeline>,
564    ) -> Result<SyncStats> {
565        // Bounded channel for backpressure (100 items)
566        let (tx, rx) = mpsc::channel::<SyncData>(100);
567
568        // Shared resources
569        let parquet = Arc::new(self.storage.parquet.clone());
570        let client = self.client.clone();
571        let token = self.token.clone();
572        let stats = Arc::new(TokioMutex::new(SyncStats::default()));
573        let in_flight = Arc::new(AtomicUsize::new(0));
574        let display_name = Arc::new(self.get_display_name().await?);
575        let profile_id = self.profile_id;
576
577        // Spawn producers (API fetchers)
578        let mut producer_handles = Vec::new();
579        for id in 0..opts.concurrency {
580            let tx = tx.clone();
581            let queue = queue.clone();
582            let context = ProducerContext {
583                rate_limiter: rate_limiter.clone(),
584                client: client.clone(),
585                token: token.clone(),
586                progress: progress.clone(),
587                display_name: Arc::clone(&display_name),
588                profile_id,
589                stats: Arc::clone(&stats),
590                in_flight: Arc::clone(&in_flight),
591                parquet: Arc::clone(&parquet),
592                force: opts.force,
593                pipeline_filter,
594            };
595
596            producer_handles.push(tokio::spawn(async move {
597                producer_loop(id, queue, tx, context).await
598            }));
599        }
600        drop(tx); // Close sender so consumers know when done
601
602        // Spawn consumers (Parquet writers)
603        let rx = Arc::new(TokioMutex::new(rx));
604        let mut consumer_handles = Vec::new();
605        for id in 0..opts.concurrency {
606            let rx = Arc::clone(&rx);
607            let parquet = Arc::clone(&parquet);
608            let queue = queue.clone();
609            let stats = Arc::clone(&stats);
610            let progress = progress.clone();
611            let in_flight = Arc::clone(&in_flight);
612
613            consumer_handles.push(tokio::spawn(async move {
614                consumer_loop(id, rx, parquet, queue, stats, progress, in_flight).await
615            }));
616        }
617
618        // Wait for all producers to finish
619        for h in producer_handles {
620            if let Err(e) = h.await {
621                eprintln!("Producer error: {}", e);
622            }
623        }
624
625        // Wait for all consumers to finish
626        for h in consumer_handles {
627            if let Err(e) = h.await {
628                eprintln!("Consumer error: {}", e);
629            }
630        }
631
632        // Cleanup old completed tasks
633        queue.cleanup(7).await?;
634
635        // Update profile sync time
636        self.storage
637            .sync_db
638            .update_profile_sync_time(self.profile_id)?;
639
640        // Extract final stats
641        let final_stats = stats.lock().await;
642        Ok(SyncStats {
643            recovered: final_stats.recovered,
644            completed: final_stats.completed,
645            rate_limited: final_stats.rate_limited,
646            failed: final_stats.failed,
647        })
648    }
649
650    /// Count pending tasks by type and update progress
651    ///
652    /// Sets initial totals based on actual tasks in queue.
653    /// GPX totals are updated dynamically as activities are discovered.
654    fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
655        // Count actual tasks by type from the queue
656        let (_activities, _gpx, health, performance) = self.queue.count_by_type()?;
657
658        // Activities and GPX totals are set during planning from API discovery
659        // Only set them if planning didn't provide accurate counts
660        if progress.activities.get_total() == 0 {
661            progress.activities.set_total(1); // At least 1 for pagination
662            progress.activities.set_dynamic(true);
663        }
664        if progress.gpx.get_total() == 0 {
665            progress.gpx.set_dynamic(true); // Will be discovered during sync
666        }
667
668        // Health and performance totals come from date range calculation
669        progress.health.set_total(health);
670        progress.performance.set_total(performance);
671
672        Ok(())
673    }
674
675    /// Plan sync tasks with progress tracking.
676    async fn plan_sync_with_progress(
677        &mut self,
678        opts: &SyncOptions,
679        progress: &SyncProgress,
680    ) -> Result<()> {
681        let today = Utc::now().date_naive();
682
683        match opts.mode {
684            progress::SyncMode::Latest => self.plan_latest_sync(opts, progress, today).await,
685            progress::SyncMode::Backfill => self.plan_backfill_sync(opts, progress, today).await,
686        }
687    }
688
689    /// Plan Latest sync: from last_sync_date to today
690    pub(crate) async fn plan_latest_sync(
691        &mut self,
692        opts: &SyncOptions,
693        progress: &SyncProgress,
694        today: NaiveDate,
695    ) -> Result<()> {
696        // Get last sync date from DB, default to 7 days ago
697        let last_sync = self
698            .storage
699            .sync_db
700            .get_sync_state(self.profile_id, "activities")?;
701        let from_date = opts.from_date.unwrap_or_else(|| {
702            last_sync
703                .and_then(|s| s.last_sync_date)
704                .unwrap_or_else(|| today - Duration::days(7))
705        });
706        let to_date = opts.to_date.unwrap_or(today);
707
708        progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
709        progress.set_oldest_activity_date(&from_date.to_string());
710
711        let total_days = (to_date - from_date).num_days().max(0) as u32 + 1;
712
713        // Plan activity sync (paginated from newest)
714        if opts.sync_activities {
715            progress.set_planning_step(PlanningStep::PlanningActivities);
716            self.plan_activities_sync(SyncPipeline::Frontier, Some(from_date), Some(to_date))?;
717        }
718
719        // Plan health sync for date range
720        if opts.sync_health {
721            progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
722            self.plan_health_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
723        }
724
725        // Plan performance sync
726        if opts.sync_performance {
727            let total_weeks = (total_days / 7).max(1);
728            progress.set_planning_step(PlanningStep::PlanningPerformance { weeks: total_weeks });
729            self.plan_performance_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
730        }
731
732        Ok(())
733    }
734
735    /// Plan Backfill sync: from oldest_activity_date to backfill_frontier
736    pub(crate) async fn plan_backfill_sync(
737        &mut self,
738        opts: &SyncOptions,
739        progress: &SyncProgress,
740        today: NaiveDate,
741    ) -> Result<()> {
742        // Find oldest activity date (this also gives us total count)
743        let (oldest_date, total_activities, estimated_gps) =
744            self.find_oldest_activity_date(Some(progress)).await?;
745
746        // Get or initialize backfill frontier
747        let backfill_state = self
748            .storage
749            .sync_db
750            .get_backfill_state(self.profile_id, "activities")?;
751
752        let (frontier_date, activities_complete) = match backfill_state {
753            Some((frontier, _target, complete)) => (frontier, complete),
754            None => {
755                // Initialize frontier from last_sync_date or today
756                let last_sync = self
757                    .storage
758                    .sync_db
759                    .get_sync_state(self.profile_id, "activities")?;
760                let frontier = last_sync.and_then(|s| s.last_sync_date).unwrap_or(today);
761
762                // Initialize backfill state
763                self.storage.sync_db.set_backfill_state(
764                    self.profile_id,
765                    "activities",
766                    frontier,
767                    oldest_date,
768                    false,
769                )?;
770                (frontier, false)
771            }
772        };
773
774        // Backfill from oldest_date to frontier_date
775        let activity_from = opts.from_date.unwrap_or(oldest_date);
776        let activity_to = opts.to_date.unwrap_or(frontier_date);
777
778        progress.set_backfill_range(&frontier_date.to_string(), &oldest_date.to_string());
779        progress.set_oldest_activity_date(&oldest_date.to_string());
780
781        // Plan activity sync with known totals
782        if opts.sync_activities && !activities_complete {
783            progress.set_planning_step(PlanningStep::PlanningActivities);
784            self.plan_activities_sync(
785                SyncPipeline::Backfill,
786                Some(activity_from),
787                Some(activity_to),
788            )?;
789            if total_activities > 0 {
790                progress.activities.set_total(total_activities);
791                progress.gpx.set_total(estimated_gps);
792            }
793        }
794
795        if opts.sync_health {
796            let health_state = self
797                .storage
798                .sync_db
799                .get_backfill_state(self.profile_id, "health")?;
800
801            let health_target = match health_state {
802                Some((_frontier, target, complete)) => (!complete).then_some(target),
803                None => {
804                    let search_from = activity_from;
805                    let search_to = activity_to;
806                    let first_health = self
807                        .find_first_health_date(Some(progress), search_from, search_to)
808                        .await?;
809                    match first_health {
810                        Some(first) => {
811                            self.storage.sync_db.set_backfill_state(
812                                self.profile_id,
813                                "health",
814                                frontier_date,
815                                first,
816                                false,
817                            )?;
818                            Some(first)
819                        }
820                        None => {
821                            self.storage.sync_db.set_backfill_state(
822                                self.profile_id,
823                                "health",
824                                frontier_date,
825                                frontier_date,
826                                true,
827                            )?;
828                            None
829                        }
830                    }
831                }
832            };
833
834            if let Some(target) = health_target {
835                let health_from = std::cmp::max(activity_from, target);
836                let health_to = activity_to;
837                if health_from <= health_to {
838                    let total_days = (health_to - health_from).num_days().max(0) as u32 + 1;
839                    progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
840                    self.plan_health_sync(
841                        health_from,
842                        health_to,
843                        opts.force,
844                        SyncPipeline::Backfill,
845                    )?;
846                }
847            }
848        }
849
850        if opts.sync_performance {
851            let perf_state = self
852                .storage
853                .sync_db
854                .get_backfill_state(self.profile_id, "performance")?;
855
856            let perf_target = match perf_state {
857                Some((_frontier, target, complete)) => (!complete).then_some(target),
858                None => {
859                    let search_from = activity_from;
860                    let search_to = activity_to;
861                    let first_perf = self
862                        .find_first_performance_date(Some(progress), search_from, search_to)
863                        .await?;
864                    match first_perf {
865                        Some(first) => {
866                            self.storage.sync_db.set_backfill_state(
867                                self.profile_id,
868                                "performance",
869                                frontier_date,
870                                first,
871                                false,
872                            )?;
873                            Some(first)
874                        }
875                        None => {
876                            self.storage.sync_db.set_backfill_state(
877                                self.profile_id,
878                                "performance",
879                                frontier_date,
880                                frontier_date,
881                                true,
882                            )?;
883                            None
884                        }
885                    }
886                }
887            };
888
889            if let Some(target) = perf_target {
890                let perf_from = std::cmp::max(activity_from, target);
891                let perf_to = activity_to;
892                if perf_from <= perf_to {
893                    let total_weeks = ((perf_to - perf_from).num_days().max(0) as u32 / 7).max(1);
894                    progress.set_planning_step(PlanningStep::PlanningPerformance {
895                        weeks: total_weeks,
896                    });
897                    self.plan_performance_sync(
898                        perf_from,
899                        perf_to,
900                        opts.force,
901                        SyncPipeline::Backfill,
902                    )?;
903                }
904            }
905        }
906
907        Ok(())
908    }
909
910    /// Plan activity sync tasks
911    fn plan_activities_sync(
912        &self,
913        pipeline: SyncPipeline,
914        min_date: Option<NaiveDate>,
915        max_date: Option<NaiveDate>,
916    ) -> Result<()> {
917        // Start with first page, we'll add more as we discover them
918        let task = SyncTask::new(
919            self.profile_id,
920            pipeline,
921            SyncTaskType::Activities {
922                start: 0,
923                limit: 50,
924                min_date,
925                max_date,
926            },
927        );
928        self.queue.push(task)?;
929        Ok(())
930    }
931
932    /// Plan health sync tasks for date range, returns count of tasks added
933    fn plan_health_sync(
934        &self,
935        from: NaiveDate,
936        to: NaiveDate,
937        force: bool,
938        pipeline: SyncPipeline,
939    ) -> Result<u32> {
940        let mut count = 0;
941        let mut date = from;
942        while date <= to {
943            if force
944                || !self
945                    .storage
946                    .parquet
947                    .has_daily_health(self.profile_id, date)?
948            {
949                let task = SyncTask::new(
950                    self.profile_id,
951                    pipeline,
952                    SyncTaskType::DailyHealth { date },
953                );
954                self.queue.push(task)?;
955                count += 1;
956            }
957            date += Duration::days(1);
958        }
959        Ok(count)
960    }
961
962    /// Plan performance sync tasks, returns count of tasks added
963    fn plan_performance_sync(
964        &self,
965        from: NaiveDate,
966        to: NaiveDate,
967        force: bool,
968        pipeline: SyncPipeline,
969    ) -> Result<u32> {
970        // Performance metrics don't change daily, sync weekly
971        let mut count = 0;
972        let mut date = from;
973        while date <= to {
974            if force
975                || !self
976                    .storage
977                    .parquet
978                    .has_performance_metrics(self.profile_id, date)?
979            {
980                let task = SyncTask::new(
981                    self.profile_id,
982                    pipeline,
983                    SyncTaskType::Performance { date },
984                );
985                self.queue.push(task)?;
986                count += 1;
987            }
988            date += Duration::days(7);
989        }
990        Ok(count)
991    }
992}
993
994fn print_sync_overview(progress: &SyncProgress) {
995    println!("Profile: {}", progress.get_profile());
996    println!("Mode: {}", progress.get_sync_mode());
997    println!("Range: {}", progress.get_date_range());
998    println!(
999        "Queued tasks: activities {}, gpx {}, health {}, performance {}",
1000        progress.activities.get_total(),
1001        progress.gpx.get_total(),
1002        progress.health.get_total(),
1003        progress.performance.get_total()
1004    );
1005}
1006
1007fn print_sync_errors(progress: &SyncProgress) {
1008    let errors = progress.get_errors();
1009    if errors.is_empty() {
1010        return;
1011    }
1012
1013    println!("\nRecent errors:");
1014    for error in errors.iter().take(5) {
1015        println!("  [{}] {}: {}", error.stream, error.item, error.error);
1016    }
1017
1018    if errors.len() > 5 {
1019        println!("  ... and {} more", errors.len() - 5);
1020    }
1021}
1022
1023async fn run_progress_reporter(progress: SharedProgress) {
1024    loop {
1025        progress.print_simple_status();
1026
1027        if progress.should_shutdown() || progress.is_complete() {
1028            println!();
1029            break;
1030        }
1031
1032        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1033    }
1034}
1035
1036/// Options for sync operation
1037#[derive(Clone)]
1038pub struct SyncOptions {
1039    /// Sync activities
1040    pub sync_activities: bool,
1041    /// Sync daily health
1042    pub sync_health: bool,
1043    /// Sync performance metrics
1044    pub sync_performance: bool,
1045    /// Start date for sync
1046    pub from_date: Option<NaiveDate>,
1047    /// End date for sync
1048    pub to_date: Option<NaiveDate>,
1049    /// Dry run (plan only, don't execute)
1050    pub dry_run: bool,
1051    /// Force re-sync (ignore existing data)
1052    pub force: bool,
1053    /// Number of concurrent API requests (default: 4)
1054    pub concurrency: usize,
1055    /// Sync mode (Latest or Backfill)
1056    pub mode: progress::SyncMode,
1057}
1058
1059impl Default for SyncOptions {
1060    fn default() -> Self {
1061        Self {
1062            sync_activities: false,
1063            sync_health: false,
1064            sync_performance: false,
1065            from_date: None,
1066            to_date: None,
1067            dry_run: false,
1068            force: false,
1069            concurrency: 4,
1070            mode: progress::SyncMode::Latest,
1071        }
1072    }
1073}
1074
1075/// Statistics from sync operation
1076#[derive(Default)]
1077pub struct SyncStats {
1078    /// Tasks recovered from previous run
1079    pub recovered: u32,
1080    /// Tasks completed successfully
1081    pub completed: u32,
1082    /// Tasks that hit rate limits
1083    pub rate_limited: u32,
1084    /// Tasks that failed
1085    pub failed: u32,
1086}
1087
1088impl std::fmt::Display for SyncStats {
1089    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1090        write!(
1091            f,
1092            "Completed: {}, Failed: {}, Rate limited: {}",
1093            self.completed, self.failed, self.rate_limited
1094        )?;
1095        if self.recovered > 0 {
1096            write!(f, ", Recovered: {}", self.recovered)?;
1097        }
1098        Ok(())
1099    }
1100}
1101
1102/// Update progress when starting a task
1103fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1104    let desc = match &task.task_type {
1105        SyncTaskType::Activities { start, limit, .. } => {
1106            format!("Activities {}-{}", start, start + limit)
1107        }
1108        SyncTaskType::DownloadGpx {
1109            activity_name,
1110            activity_date,
1111            ..
1112        } => {
1113            let name = activity_name.as_deref().unwrap_or("Unknown");
1114            let date = activity_date.as_deref().unwrap_or("");
1115            if date.is_empty() {
1116                name.to_string()
1117            } else {
1118                format!("{} {}", date, name)
1119            }
1120        }
1121        SyncTaskType::DailyHealth { date } => date.to_string(),
1122        SyncTaskType::Performance { date } => date.to_string(),
1123        _ => String::new(),
1124    };
1125
1126    match &task.task_type {
1127        SyncTaskType::Activities { .. } => {
1128            progress.activities.set_current_item(desc.clone());
1129            progress.activities.set_last_item(desc);
1130        }
1131        SyncTaskType::DownloadGpx { .. } => {
1132            progress.gpx.set_current_item(desc.clone());
1133            progress.gpx.set_last_item(desc);
1134        }
1135        SyncTaskType::DailyHealth { .. } => {
1136            progress.health.set_current_item(desc.clone());
1137            progress.health.set_last_item(desc);
1138        }
1139        SyncTaskType::Performance { .. } => {
1140            progress.performance.set_current_item(desc.clone());
1141            progress.performance.set_last_item(desc);
1142        }
1143        _ => {}
1144    }
1145}
1146
1147/// Mark a task as completed in progress
1148#[allow(dead_code)]
1149fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1150    match &task.task_type {
1151        SyncTaskType::Activities { .. } => {
1152            progress.activities.complete_one();
1153            progress.activities.clear_current_item();
1154        }
1155        SyncTaskType::DownloadGpx { .. } => {
1156            progress.gpx.complete_one();
1157            progress.gpx.clear_current_item();
1158        }
1159        SyncTaskType::DailyHealth { .. } => {
1160            progress.health.complete_one();
1161            progress.health.clear_current_item();
1162        }
1163        SyncTaskType::Performance { .. } => {
1164            progress.performance.complete_one();
1165            progress.performance.clear_current_item();
1166        }
1167        _ => {}
1168    }
1169}
1170
1171/// Mark a task as failed in progress and record error details
1172fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress, error: &str) {
1173    let (stream_name, item_desc) = match &task.task_type {
1174        SyncTaskType::Activities { start, limit, .. } => {
1175            progress.activities.fail_one();
1176            progress.activities.clear_current_item();
1177            ("Activities", format!("{}-{}", start, start + limit))
1178        }
1179        SyncTaskType::DownloadGpx {
1180            activity_id,
1181            activity_name,
1182            ..
1183        } => {
1184            progress.gpx.fail_one();
1185            progress.gpx.clear_current_item();
1186            (
1187                "GPX",
1188                activity_name
1189                    .clone()
1190                    .unwrap_or_else(|| activity_id.to_string()),
1191            )
1192        }
1193        SyncTaskType::DailyHealth { date } => {
1194            progress.health.fail_one();
1195            progress.health.clear_current_item();
1196            ("Health", date.to_string())
1197        }
1198        SyncTaskType::Performance { date } => {
1199            progress.performance.fail_one();
1200            progress.performance.clear_current_item();
1201            ("Performance", date.to_string())
1202        }
1203        _ => return,
1204    };
1205
1206    progress.add_error(stream_name, item_desc, error.to_string());
1207}
1208
1209const MAX_IDLE_RETRIES: u32 = 10;
1210
1211fn should_exit_when_idle(idle_loops: u32, in_flight: usize) -> bool {
1212    idle_loops >= MAX_IDLE_RETRIES && in_flight == 0
1213}
1214
1215fn record_write_failure(data: &SyncData, progress: &SyncProgress, error: &str) {
1216    match data {
1217        SyncData::Activities { records, .. } => {
1218            progress.activities.fail_one();
1219            progress.activities.clear_current_item();
1220            let item = records
1221                .first()
1222                .map(|r| r.activity_id.to_string())
1223                .unwrap_or_else(|| "batch".to_string());
1224            progress.add_error("Activities", item, error.to_string());
1225        }
1226        SyncData::Health { record, .. } => {
1227            progress.health.fail_one();
1228            progress.health.clear_current_item();
1229            progress.add_error("Health", record.date.to_string(), error.to_string());
1230        }
1231        SyncData::Performance { record, .. } => {
1232            progress.performance.fail_one();
1233            progress.performance.clear_current_item();
1234            progress.add_error("Performance", record.date.to_string(), error.to_string());
1235        }
1236        SyncData::TrackPoints {
1237            activity_id, date, ..
1238        } => {
1239            progress.gpx.fail_one();
1240            progress.gpx.clear_current_item();
1241            progress.add_error(
1242                "GPX",
1243                format!("{} ({})", date, activity_id),
1244                error.to_string(),
1245            );
1246        }
1247    }
1248}
1249
1250// =============================================================================
1251// Producer/Consumer Pipeline
1252// =============================================================================
1253
1254/// Producer loop: fetches data from Garmin API and sends to channel
1255async fn producer_loop(
1256    _id: usize,
1257    queue: SharedTaskQueue,
1258    tx: mpsc::Sender<SyncData>,
1259    context: ProducerContext,
1260) {
1261    let mut empty_count = 0;
1262
1263    loop {
1264        // Pop next task
1265        let next_task = match context.pipeline_filter {
1266            Some(pipeline) => queue.pop_round_robin_with_pipeline(Some(pipeline)).await,
1267            None => queue.pop_round_robin().await,
1268        };
1269
1270        let task = match next_task {
1271            Ok(Some(task)) => {
1272                empty_count = 0; // Reset counter on successful pop
1273                task
1274            }
1275            Ok(None) => {
1276                // Queue is empty, but consumers might be adding new tasks
1277                // Wait a bit and retry before giving up
1278                empty_count += 1;
1279                if should_exit_when_idle(empty_count, context.in_flight.load(Ordering::Relaxed)) {
1280                    break; // No more tasks after multiple retries
1281                }
1282                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1283                continue;
1284            }
1285            Err(e) => {
1286                eprintln!("Queue error: {}", e);
1287                break;
1288            }
1289        };
1290
1291        let task_id = task.id.unwrap();
1292        context.in_flight.fetch_add(1, Ordering::Relaxed);
1293
1294        // Mark in progress
1295        if let Err(e) = queue.mark_in_progress(task_id).await {
1296            eprintln!("Failed to mark task in progress: {}", e);
1297            continue;
1298        }
1299
1300        // Update progress display
1301        update_progress_for_task(&task, &context.progress);
1302
1303        // Skip tasks that already exist unless forcing
1304        if !context.force {
1305            let should_skip = match &task.task_type {
1306                SyncTaskType::DailyHealth { date } => context
1307                    .parquet
1308                    .has_daily_health(context.profile_id, *date)
1309                    .unwrap_or(false),
1310                SyncTaskType::Performance { date } => context
1311                    .parquet
1312                    .has_performance_metrics(context.profile_id, *date)
1313                    .unwrap_or(false),
1314                SyncTaskType::DownloadGpx {
1315                    activity_id,
1316                    activity_date,
1317                    ..
1318                } => activity_date
1319                    .as_ref()
1320                    .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1321                    .and_then(|date| context.parquet.has_track_points(*activity_id, date).ok())
1322                    .unwrap_or(false),
1323                _ => false,
1324            };
1325
1326            if should_skip {
1327                if let Err(e) = queue.mark_completed(task_id).await {
1328                    eprintln!("Failed to mark task completed: {}", e);
1329                }
1330                complete_progress_for_task(&task, &context.progress);
1331                {
1332                    let mut s = context.stats.lock().await;
1333                    s.completed += 1;
1334                }
1335                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1336                continue;
1337            }
1338        }
1339
1340        // Acquire rate limiter permit
1341        let _permit = context.rate_limiter.acquire().await;
1342        context.progress.record_request();
1343
1344        // Fetch data based on task type
1345        let result = fetch_task_data(
1346            &task,
1347            &context.client,
1348            &context.token,
1349            &context.display_name,
1350            context.profile_id,
1351        )
1352        .await;
1353
1354        match result {
1355            Ok(data) => {
1356                context.rate_limiter.on_success();
1357                // Send to consumer
1358                if tx.send(data).await.is_err() {
1359                    // Channel closed, consumer is done
1360                    context.in_flight.fetch_sub(1, Ordering::Relaxed);
1361                    let backoff = Duration::seconds(60);
1362                    let _ = queue
1363                        .mark_failed(task_id, "Consumer channel closed", backoff)
1364                        .await;
1365                    break;
1366                }
1367            }
1368            Err(GarminError::RateLimited) => {
1369                context.rate_limiter.on_rate_limit();
1370                let backoff = Duration::seconds(60);
1371                if let Err(e) = queue.mark_failed(task_id, "Rate limited", backoff).await {
1372                    eprintln!("Failed to mark task as rate limited: {}", e);
1373                }
1374                fail_progress_for_task(&task, &context.progress, "Rate limited");
1375                {
1376                    let mut s = context.stats.lock().await;
1377                    s.rate_limited += 1;
1378                }
1379                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1380            }
1381            Err(e) => {
1382                let backoff = Duration::seconds(60);
1383                let error_msg = e.to_string();
1384                if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
1385                    eprintln!("Failed to mark task as failed: {}", e);
1386                }
1387                fail_progress_for_task(&task, &context.progress, &error_msg);
1388                {
1389                    let mut s = context.stats.lock().await;
1390                    s.failed += 1;
1391                }
1392                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1393            }
1394        }
1395    }
1396}
1397
1398fn value_to_i32(value: &serde_json::Value) -> Option<i32> {
1399    if let Some(int) = value.as_i64() {
1400        return Some(int as i32);
1401    }
1402    value.as_f64().map(|float| float.round() as i32)
1403}
1404
1405fn first_entry(value: &serde_json::Value) -> Option<&serde_json::Value> {
1406    if let Some(array) = value.as_array() {
1407        array.first()
1408    } else {
1409        Some(value)
1410    }
1411}
1412
1413fn parse_sleep_metrics(value: Option<&serde_json::Value>) -> SleepMetrics {
1414    let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value);
1415
1416    let dto = match dto {
1417        Some(dto) => dto,
1418        None => return (None, None, None, None, None),
1419    };
1420
1421    let deep = dto.get("deepSleepSeconds").and_then(value_to_i32);
1422    let light = dto.get("lightSleepSeconds").and_then(value_to_i32);
1423    let rem = dto.get("remSleepSeconds").and_then(value_to_i32);
1424
1425    let total = dto
1426        .get("sleepTimeSeconds")
1427        .and_then(value_to_i32)
1428        .or_else(|| match (deep, light, rem) {
1429            (Some(d), Some(l), Some(r)) => Some(d + l + r),
1430            _ => None,
1431        });
1432
1433    let score = dto
1434        .get("sleepScores")
1435        .and_then(|v| v.get("overall"))
1436        .and_then(|v| v.get("value"))
1437        .and_then(value_to_i32);
1438
1439    (total, deep, light, rem, score)
1440}
1441
1442fn parse_hrv_metrics(
1443    value: Option<&serde_json::Value>,
1444) -> (Option<i32>, Option<i32>, Option<String>) {
1445    let summary = value.and_then(|v| v.get("hrvSummary")).or(value);
1446
1447    let summary = match summary {
1448        Some(summary) => summary,
1449        None => return (None, None, None),
1450    };
1451
1452    let weekly_avg = summary.get("weeklyAvg").and_then(value_to_i32);
1453    let last_night = summary
1454        .get("lastNight")
1455        .or_else(|| summary.get("lastNightAvg"))
1456        .and_then(value_to_i32);
1457    let status = summary
1458        .get("status")
1459        .and_then(|v| v.as_str())
1460        .map(|s| s.to_string());
1461
1462    (weekly_avg, last_night, status)
1463}
1464
1465fn parse_vo2max_value(value: Option<&serde_json::Value>) -> Option<f64> {
1466    let entry = value.and_then(first_entry)?;
1467    entry
1468        .get("generic")
1469        .and_then(|v| v.get("vo2MaxValue"))
1470        .and_then(|v| v.as_f64())
1471}
1472
1473fn parse_vo2max_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1474    let entry = value.and_then(first_entry)?;
1475    entry
1476        .get("generic")
1477        .and_then(|v| v.get("fitnessAge"))
1478        .and_then(value_to_i32)
1479}
1480
1481fn parse_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1482    value
1483        .and_then(|v| v.get("fitnessAge"))
1484        .and_then(value_to_i32)
1485}
1486
1487fn parse_race_predictions(
1488    value: Option<&serde_json::Value>,
1489) -> (Option<i32>, Option<i32>, Option<i32>, Option<i32>) {
1490    let entry = match value.and_then(first_entry) {
1491        Some(entry) => entry,
1492        None => return (None, None, None, None),
1493    };
1494
1495    let race_5k = entry.get("time5K").and_then(value_to_i32);
1496    let race_10k = entry.get("time10K").and_then(value_to_i32);
1497    let race_half = entry.get("timeHalfMarathon").and_then(value_to_i32);
1498    let race_marathon = entry.get("timeMarathon").and_then(value_to_i32);
1499
1500    (race_5k, race_10k, race_half, race_marathon)
1501}
1502
1503fn parse_overall_score(value: Option<&serde_json::Value>) -> Option<i32> {
1504    let entry = value.and_then(first_entry)?;
1505    entry.get("overallScore").and_then(value_to_i32)
1506}
1507
1508fn parse_training_status(value: Option<&serde_json::Value>, date: NaiveDate) -> Option<String> {
1509    let root = value?;
1510    let date_str = date.to_string();
1511
1512    if let Some(latest) = root
1513        .get("mostRecentTrainingStatus")
1514        .and_then(|v| v.get("latestTrainingStatusData"))
1515        .and_then(|v| v.as_object())
1516    {
1517        for entry in latest.values() {
1518            if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1519                return entry
1520                    .get("trainingStatusFeedbackPhrase")
1521                    .and_then(|v| v.as_str())
1522                    .map(|s| s.to_string());
1523            }
1524        }
1525    }
1526
1527    if let Some(history) = root.get("trainingStatusHistory").and_then(|v| v.as_array()) {
1528        for entry in history {
1529            if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1530                return entry
1531                    .get("trainingStatusFeedbackPhrase")
1532                    .and_then(|v| v.as_str())
1533                    .map(|s| s.to_string());
1534            }
1535        }
1536    }
1537
1538    None
1539}
1540
1541/// Fetch data for a task from the Garmin API
1542async fn fetch_task_data(
1543    task: &SyncTask,
1544    client: &GarminClient,
1545    token: &OAuth2Token,
1546    display_name: &str,
1547    profile_id: i32,
1548) -> Result<SyncData> {
1549    let task_id = task.id.unwrap();
1550
1551    match &task.task_type {
1552        SyncTaskType::Activities {
1553            start,
1554            limit,
1555            min_date,
1556            max_date,
1557        } => {
1558            let path = format!(
1559                "/activitylist-service/activities/search/activities?limit={}&start={}",
1560                limit, start
1561            );
1562            let activities: Vec<serde_json::Value> = client.get_json(token, &path).await?;
1563
1564            let mut records = Vec::new();
1565            let mut gpx_tasks = Vec::new();
1566            let mut reached_min = false;
1567
1568            for activity in &activities {
1569                let activity_date = activity
1570                    .get("startTimeLocal")
1571                    .and_then(|v| v.as_str())
1572                    .and_then(|s| s.split(' ').next())
1573                    .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
1574
1575                if let Some(date) = activity_date {
1576                    if let Some(min) = *min_date {
1577                        if date < min {
1578                            reached_min = true;
1579                            break;
1580                        }
1581                    }
1582
1583                    if let Some(max) = *max_date {
1584                        if date > max {
1585                            continue;
1586                        }
1587                    }
1588                }
1589
1590                // Parse activity
1591                let parsed = parse_activity(activity, profile_id)?;
1592                records.push(parsed);
1593
1594                // Queue GPX download for activities with GPS
1595                if activity
1596                    .get("hasPolyline")
1597                    .and_then(|v| v.as_bool())
1598                    .unwrap_or(false)
1599                {
1600                    if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
1601                        let activity_name = activity
1602                            .get("activityName")
1603                            .and_then(|v| v.as_str())
1604                            .map(|s| s.to_string());
1605                        let activity_date = activity
1606                            .get("startTimeLocal")
1607                            .and_then(|v| v.as_str())
1608                            .and_then(|s| s.split(' ').next())
1609                            .map(|s| s.to_string());
1610
1611                        gpx_tasks.push(SyncTask::new(
1612                            profile_id,
1613                            task.pipeline,
1614                            SyncTaskType::DownloadGpx {
1615                                activity_id: id,
1616                                activity_name,
1617                                activity_date,
1618                            },
1619                        ));
1620                    }
1621                }
1622            }
1623
1624            // Check if there's a next page
1625            let next_page = if activities.len() == *limit as usize && !reached_min {
1626                Some(SyncTask::new(
1627                    profile_id,
1628                    task.pipeline,
1629                    SyncTaskType::Activities {
1630                        start: start + limit,
1631                        limit: *limit,
1632                        min_date: *min_date,
1633                        max_date: *max_date,
1634                    },
1635                ))
1636            } else {
1637                None
1638            };
1639
1640            Ok(SyncData::Activities {
1641                records,
1642                gpx_tasks,
1643                next_page,
1644                task_id,
1645            })
1646        }
1647
1648        SyncTaskType::DownloadGpx {
1649            activity_id,
1650            activity_date,
1651            ..
1652        } => {
1653            let path = format!("/download-service/export/gpx/activity/{}", activity_id);
1654            let gpx_bytes = client.download(token, &path).await?;
1655            let gpx_data = String::from_utf8_lossy(&gpx_bytes);
1656
1657            // Parse activity date for partitioning
1658            let date = activity_date
1659                .as_ref()
1660                .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1661                .unwrap_or_else(|| Utc::now().date_naive());
1662
1663            let points = parse_gpx(*activity_id, &gpx_data)?;
1664
1665            Ok(SyncData::TrackPoints {
1666                activity_id: *activity_id,
1667                date,
1668                points,
1669                task_id,
1670            })
1671        }
1672
1673        SyncTaskType::DailyHealth { date } => {
1674            let path = format!(
1675                "/usersummary-service/usersummary/daily/{}?calendarDate={}",
1676                display_name, date
1677            );
1678
1679            // Try to fetch health data - may return 404/error for dates without data
1680            let health_result: std::result::Result<serde_json::Value, _> =
1681                client.get_json(token, &path).await;
1682
1683            let health = match health_result {
1684                Ok(data) => data,
1685                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
1686                    // No data for this date - store empty record to mark as synced
1687                    serde_json::json!({})
1688                }
1689                Err(e) => return Err(e),
1690            };
1691
1692            let sleep_path = format!(
1693                "/wellness-service/wellness/dailySleepData/{}?date={}",
1694                display_name, date
1695            );
1696            let sleep_data: Option<serde_json::Value> =
1697                match client.get_json(token, &sleep_path).await {
1698                    Ok(data) => Some(data),
1699                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1700                    Err(e) => return Err(e),
1701                };
1702
1703            let hrv_path = format!("/hrv-service/hrv/{}", date);
1704            let hrv_data: Option<serde_json::Value> = match client.get_json(token, &hrv_path).await
1705            {
1706                Ok(data) => Some(data),
1707                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1708                Err(e) => return Err(e),
1709            };
1710
1711            let (sleep_total, deep_sleep, light_sleep, rem_sleep, sleep_score) =
1712                parse_sleep_metrics(sleep_data.as_ref());
1713            let (hrv_weekly_avg, hrv_last_night, hrv_status) = parse_hrv_metrics(hrv_data.as_ref());
1714
1715            let record = DailyHealth {
1716                id: None,
1717                profile_id,
1718                date: *date,
1719                steps: health
1720                    .get("totalSteps")
1721                    .and_then(|v| v.as_i64())
1722                    .map(|v| v as i32),
1723                step_goal: health
1724                    .get("dailyStepGoal")
1725                    .and_then(|v| v.as_i64())
1726                    .map(|v| v as i32),
1727                total_calories: health
1728                    .get("totalKilocalories")
1729                    .and_then(|v| v.as_i64())
1730                    .map(|v| v as i32),
1731                active_calories: health
1732                    .get("activeKilocalories")
1733                    .and_then(|v| v.as_i64())
1734                    .map(|v| v as i32),
1735                bmr_calories: health
1736                    .get("bmrKilocalories")
1737                    .and_then(|v| v.as_i64())
1738                    .map(|v| v as i32),
1739                resting_hr: health
1740                    .get("restingHeartRate")
1741                    .and_then(|v| v.as_i64())
1742                    .map(|v| v as i32),
1743                sleep_seconds: sleep_total.or_else(|| {
1744                    health
1745                        .get("sleepingSeconds")
1746                        .and_then(|v| v.as_i64())
1747                        .map(|v| v as i32)
1748                }),
1749                deep_sleep_seconds: deep_sleep,
1750                light_sleep_seconds: light_sleep,
1751                rem_sleep_seconds: rem_sleep,
1752                sleep_score,
1753                avg_stress: health
1754                    .get("averageStressLevel")
1755                    .and_then(|v| v.as_i64())
1756                    .map(|v| v as i32),
1757                max_stress: health
1758                    .get("maxStressLevel")
1759                    .and_then(|v| v.as_i64())
1760                    .map(|v| v as i32),
1761                body_battery_start: health
1762                    .get("bodyBatteryChargedValue")
1763                    .and_then(|v| v.as_i64())
1764                    .map(|v| v as i32),
1765                body_battery_end: health
1766                    .get("bodyBatteryDrainedValue")
1767                    .and_then(|v| v.as_i64())
1768                    .map(|v| v as i32),
1769                hrv_weekly_avg,
1770                hrv_last_night,
1771                hrv_status,
1772                avg_respiration: health
1773                    .get("averageRespirationValue")
1774                    .and_then(|v| v.as_f64()),
1775                avg_spo2: health
1776                    .get("averageSpo2Value")
1777                    .and_then(|v| v.as_i64())
1778                    .map(|v| v as i32),
1779                lowest_spo2: health
1780                    .get("lowestSpo2Value")
1781                    .and_then(|v| v.as_i64())
1782                    .map(|v| v as i32),
1783                hydration_ml: health
1784                    .get("hydrationIntakeGoal")
1785                    .and_then(|v| v.as_i64())
1786                    .map(|v| v as i32),
1787                moderate_intensity_min: health
1788                    .get("moderateIntensityMinutes")
1789                    .and_then(|v| v.as_i64())
1790                    .map(|v| v as i32),
1791                vigorous_intensity_min: health
1792                    .get("vigorousIntensityMinutes")
1793                    .and_then(|v| v.as_i64())
1794                    .map(|v| v as i32),
1795                raw_json: Some(health),
1796            };
1797
1798            Ok(SyncData::Health { record, task_id })
1799        }
1800
1801        SyncTaskType::Performance { date } => {
1802            let vo2_path = format!("/metrics-service/metrics/maxmet/daily/{}/{}", date, date);
1803            let vo2max: Option<serde_json::Value> = match client.get_json(token, &vo2_path).await {
1804                Ok(data) => Some(data),
1805                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1806                Err(e) => return Err(e),
1807            };
1808
1809            let race_path = format!(
1810                "/metrics-service/metrics/racepredictions/daily/{}?fromCalendarDate={}&toCalendarDate={}",
1811                display_name, date, date
1812            );
1813            let race_predictions: Option<serde_json::Value> =
1814                match client.get_json(token, &race_path).await {
1815                    Ok(data) => Some(data),
1816                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1817                    Err(e) => return Err(e),
1818                };
1819
1820            // Fetch training readiness
1821            let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
1822            let training_readiness: Option<serde_json::Value> =
1823                client.get_json(token, &readiness_path).await.ok();
1824
1825            // Fetch training status
1826            let status_path = format!(
1827                "/metrics-service/metrics/trainingstatus/aggregated/{}",
1828                date
1829            );
1830            let training_status: Option<serde_json::Value> =
1831                client.get_json(token, &status_path).await.ok();
1832
1833            let endurance_path = format!(
1834                "/metrics-service/metrics/endurancescore?calendarDate={}",
1835                date
1836            );
1837            let endurance_score_data: Option<serde_json::Value> =
1838                match client.get_json(token, &endurance_path).await {
1839                    Ok(data) => Some(data),
1840                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1841                    Err(e) => return Err(e),
1842                };
1843
1844            let hill_path = format!("/metrics-service/metrics/hillscore?calendarDate={}", date);
1845            let hill_score_data: Option<serde_json::Value> =
1846                match client.get_json(token, &hill_path).await {
1847                    Ok(data) => Some(data),
1848                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1849                    Err(e) => return Err(e),
1850                };
1851
1852            let fitness_age_path = format!("/fitnessage-service/fitnessage/{}", date);
1853            let fitness_age_data: Option<serde_json::Value> =
1854                match client.get_json(token, &fitness_age_path).await {
1855                    Ok(data) => Some(data),
1856                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1857                    Err(e) => return Err(e),
1858                };
1859
1860            let vo2max_value = parse_vo2max_value(vo2max.as_ref());
1861            let fitness_age = parse_fitness_age(fitness_age_data.as_ref())
1862                .or_else(|| parse_vo2max_fitness_age(vo2max.as_ref()));
1863
1864            let readiness_entry = training_readiness
1865                .as_ref()
1866                .and_then(|v| v.as_array())
1867                .and_then(|arr| arr.first());
1868
1869            let readiness_score = readiness_entry
1870                .and_then(|e| e.get("score"))
1871                .and_then(|v| v.as_i64())
1872                .map(|v| v as i32);
1873
1874            let training_status_str = parse_training_status(training_status.as_ref(), *date);
1875            let (race_5k, race_10k, race_half, race_marathon) =
1876                parse_race_predictions(race_predictions.as_ref());
1877            let endurance_score = parse_overall_score(endurance_score_data.as_ref());
1878            let hill_score = parse_overall_score(hill_score_data.as_ref());
1879
1880            let record = PerformanceMetrics {
1881                id: None,
1882                profile_id,
1883                date: *date,
1884                vo2max: vo2max_value,
1885                fitness_age,
1886                training_readiness: readiness_score,
1887                training_status: training_status_str,
1888                lactate_threshold_hr: None,
1889                lactate_threshold_pace: None,
1890                race_5k_sec: race_5k,
1891                race_10k_sec: race_10k,
1892                race_half_sec: race_half,
1893                race_marathon_sec: race_marathon,
1894                endurance_score,
1895                hill_score,
1896                raw_json: None,
1897            };
1898
1899            Ok(SyncData::Performance { record, task_id })
1900        }
1901
1902        _ => {
1903            // Other task types not implemented for parallel yet
1904            Err(GarminError::invalid_response(
1905                "Unsupported task type for parallel sync",
1906            ))
1907        }
1908    }
1909}
1910
1911/// Parse activity JSON into Activity struct (standalone version for producer)
1912fn parse_activity(activity: &serde_json::Value, profile_id: i32) -> Result<Activity> {
1913    let activity_id = activity
1914        .get("activityId")
1915        .and_then(|v| v.as_i64())
1916        .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
1917
1918    let start_time_local = activity
1919        .get("startTimeLocal")
1920        .and_then(|v| v.as_str())
1921        .and_then(parse_garmin_datetime);
1922
1923    let start_time_gmt = activity
1924        .get("startTimeGMT")
1925        .and_then(|v| v.as_str())
1926        .and_then(parse_garmin_datetime);
1927
1928    Ok(Activity {
1929        activity_id,
1930        profile_id,
1931        activity_name: activity
1932            .get("activityName")
1933            .and_then(|v| v.as_str())
1934            .map(|s| s.to_string()),
1935        activity_type: activity
1936            .get("activityType")
1937            .and_then(|v| v.get("typeKey"))
1938            .and_then(|v| v.as_str())
1939            .map(|s| s.to_string()),
1940        start_time_local,
1941        start_time_gmt,
1942        duration_sec: activity.get("duration").and_then(|v| v.as_f64()),
1943        distance_m: activity.get("distance").and_then(|v| v.as_f64()),
1944        calories: activity
1945            .get("calories")
1946            .and_then(|v| v.as_i64())
1947            .map(|v| v as i32),
1948        avg_hr: activity
1949            .get("averageHR")
1950            .and_then(|v| v.as_i64())
1951            .map(|v| v as i32),
1952        max_hr: activity
1953            .get("maxHR")
1954            .and_then(|v| v.as_i64())
1955            .map(|v| v as i32),
1956        avg_speed: activity.get("averageSpeed").and_then(|v| v.as_f64()),
1957        max_speed: activity.get("maxSpeed").and_then(|v| v.as_f64()),
1958        elevation_gain: activity.get("elevationGain").and_then(|v| v.as_f64()),
1959        elevation_loss: activity.get("elevationLoss").and_then(|v| v.as_f64()),
1960        avg_cadence: activity
1961            .get("averageRunningCadenceInStepsPerMinute")
1962            .and_then(|v| v.as_f64()),
1963        avg_power: activity
1964            .get("avgPower")
1965            .and_then(|v| v.as_i64())
1966            .map(|v| v as i32),
1967        normalized_power: activity
1968            .get("normPower")
1969            .and_then(|v| v.as_i64())
1970            .map(|v| v as i32),
1971        training_effect: activity
1972            .get("aerobicTrainingEffect")
1973            .and_then(|v| v.as_f64()),
1974        training_load: activity
1975            .get("activityTrainingLoad")
1976            .and_then(|v| v.as_f64()),
1977        start_lat: activity.get("startLatitude").and_then(|v| v.as_f64()),
1978        start_lon: activity.get("startLongitude").and_then(|v| v.as_f64()),
1979        end_lat: activity.get("endLatitude").and_then(|v| v.as_f64()),
1980        end_lon: activity.get("endLongitude").and_then(|v| v.as_f64()),
1981        ground_contact_time: activity
1982            .get("avgGroundContactTime")
1983            .and_then(|v| v.as_f64()),
1984        vertical_oscillation: activity
1985            .get("avgVerticalOscillation")
1986            .and_then(|v| v.as_f64()),
1987        stride_length: activity.get("avgStrideLength").and_then(|v| v.as_f64()),
1988        location_name: activity
1989            .get("locationName")
1990            .and_then(|v| v.as_str())
1991            .map(|s| s.to_string()),
1992        raw_json: Some(activity.clone()),
1993    })
1994}
1995
1996fn parse_garmin_datetime(value: &str) -> Option<DateTime<Utc>> {
1997    if let Ok(dt) = DateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1998        return Some(dt.with_timezone(&Utc));
1999    }
2000
2001    let naive = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
2002        .or_else(|_| NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f"))
2003        .ok()?;
2004
2005    Some(DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2006}
2007
2008/// Parse GPX data and return track points
2009fn parse_gpx(activity_id: i64, gpx_data: &str) -> Result<Vec<TrackPoint>> {
2010    use gpx::read;
2011    use std::io::BufReader;
2012
2013    let reader = BufReader::new(gpx_data.as_bytes());
2014    let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
2015
2016    let mut points = Vec::new();
2017
2018    for track in gpx.tracks {
2019        for segment in track.segments {
2020            for point in segment.points {
2021                let timestamp = point
2022                    .time
2023                    .map(|t| {
2024                        DateTime::parse_from_rfc3339(&t.format().unwrap_or_default())
2025                            .map(|dt| dt.with_timezone(&Utc))
2026                            .unwrap_or_default()
2027                    })
2028                    .unwrap_or_default();
2029
2030                points.push(TrackPoint {
2031                    id: None,
2032                    activity_id,
2033                    timestamp,
2034                    lat: Some(point.point().y()),
2035                    lon: Some(point.point().x()),
2036                    elevation: point.elevation,
2037                    heart_rate: None,
2038                    cadence: None,
2039                    power: None,
2040                    speed: None,
2041                });
2042            }
2043        }
2044    }
2045
2046    Ok(points)
2047}
2048
2049/// Consumer loop: receives data from channel and writes to Parquet
2050async fn consumer_loop(
2051    _id: usize,
2052    rx: Arc<TokioMutex<mpsc::Receiver<SyncData>>>,
2053    parquet: Arc<ParquetStore>,
2054    queue: SharedTaskQueue,
2055    stats: Arc<TokioMutex<SyncStats>>,
2056    progress: SharedProgress,
2057    in_flight: Arc<AtomicUsize>,
2058) {
2059    loop {
2060        // Receive next data item
2061        let data = {
2062            let mut rx = rx.lock().await;
2063            rx.recv().await
2064        };
2065
2066        let data = match data {
2067            Some(d) => d,
2068            None => break, // Channel closed, all producers done
2069        };
2070
2071        // Process and write data
2072        let result = match &data {
2073            SyncData::Activities {
2074                records,
2075                gpx_tasks,
2076                next_page,
2077                task_id,
2078            } => {
2079                // Write activities to Parquet
2080                let write_result = parquet.upsert_activities_async(records).await;
2081
2082                if write_result.is_ok() {
2083                    // Queue GPX tasks and update progress totals
2084                    let mut gpx_added = 0u32;
2085                    for gpx_task in gpx_tasks {
2086                        let should_skip = match &gpx_task.task_type {
2087                            SyncTaskType::DownloadGpx {
2088                                activity_id,
2089                                activity_date,
2090                                ..
2091                            } => activity_date
2092                                .as_ref()
2093                                .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
2094                                .and_then(|date| parquet.has_track_points(*activity_id, date).ok())
2095                                .unwrap_or(false),
2096                            _ => false,
2097                        };
2098
2099                        if should_skip {
2100                            continue;
2101                        }
2102
2103                        if let Err(e) = queue.push(gpx_task.clone()).await {
2104                            eprintln!("Failed to queue GPX task: {}", e);
2105                        } else {
2106                            gpx_added += 1;
2107                        }
2108                    }
2109                    if gpx_added > 0 {
2110                        progress.gpx.add_total(gpx_added);
2111                    }
2112
2113                    // Queue next page if there is one
2114                    if let Some(next) = next_page {
2115                        if let Err(e) = queue.push(next.clone()).await {
2116                            eprintln!("Failed to queue next page: {}", e);
2117                        }
2118                        progress.activities.add_total(1);
2119                    }
2120                }
2121
2122                (write_result, *task_id, "Activities")
2123            }
2124
2125            SyncData::Health { record, task_id } => {
2126                let result = parquet
2127                    .upsert_daily_health_async(std::slice::from_ref(record))
2128                    .await;
2129                (result, *task_id, "Health")
2130            }
2131
2132            SyncData::Performance { record, task_id } => {
2133                let result = parquet
2134                    .upsert_performance_metrics_async(std::slice::from_ref(record))
2135                    .await;
2136                (result, *task_id, "Performance")
2137            }
2138
2139            SyncData::TrackPoints {
2140                date,
2141                points,
2142                task_id,
2143                ..
2144            } => {
2145                let result = parquet.write_track_points_async(*date, points).await;
2146                (result, *task_id, "GPX")
2147            }
2148        };
2149
2150        let (write_result, task_id, task_type) = result;
2151
2152        match write_result {
2153            Ok(()) => {
2154                // Mark task completed
2155                if let Err(e) = queue.mark_completed(task_id).await {
2156                    eprintln!("Failed to mark task completed: {}", e);
2157                }
2158
2159                // Update stats
2160                {
2161                    let mut s = stats.lock().await;
2162                    s.completed += 1;
2163                }
2164
2165                // Update progress based on task type
2166                match task_type {
2167                    "Activities" => {
2168                        progress.activities.complete_one();
2169                        progress.activities.clear_current_item();
2170                    }
2171                    "Health" => {
2172                        progress.health.complete_one();
2173                        progress.health.clear_current_item();
2174                    }
2175                    "Performance" => {
2176                        progress.performance.complete_one();
2177                        progress.performance.clear_current_item();
2178                    }
2179                    "GPX" => {
2180                        progress.gpx.complete_one();
2181                        progress.gpx.clear_current_item();
2182                    }
2183                    _ => {}
2184                }
2185                in_flight.fetch_sub(1, Ordering::Relaxed);
2186            }
2187            Err(e) => {
2188                // Mark task failed
2189                let backoff = Duration::seconds(60);
2190                let error_msg = e.to_string();
2191                if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
2192                    eprintln!("Failed to mark task as failed: {}", e);
2193                }
2194
2195                // Update stats
2196                {
2197                    let mut s = stats.lock().await;
2198                    s.failed += 1;
2199                }
2200
2201                record_write_failure(&data, &progress, &error_msg);
2202                eprintln!("Write error for {}: {}", task_type, error_msg);
2203                in_flight.fetch_sub(1, Ordering::Relaxed);
2204            }
2205        }
2206    }
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211    use super::*;
2212    use crate::db::models::TaskStatus;
2213    use chrono::NaiveDate;
2214    use serde_json::json;
2215    use wiremock::matchers::{method, path, query_param};
2216    use wiremock::{Mock, MockServer, ResponseTemplate};
2217
2218    #[test]
2219    fn test_should_exit_when_idle_requires_no_inflight() {
2220        assert!(!should_exit_when_idle(MAX_IDLE_RETRIES, 1));
2221        assert!(should_exit_when_idle(MAX_IDLE_RETRIES, 0));
2222        assert!(!should_exit_when_idle(MAX_IDLE_RETRIES - 1, 0));
2223    }
2224
2225    #[test]
2226    fn test_record_write_failure_updates_progress() {
2227        let progress = SyncProgress::new();
2228        let record = DailyHealth {
2229            id: None,
2230            profile_id: 1,
2231            date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
2232            steps: None,
2233            step_goal: None,
2234            total_calories: None,
2235            active_calories: None,
2236            bmr_calories: None,
2237            resting_hr: None,
2238            sleep_seconds: None,
2239            deep_sleep_seconds: None,
2240            light_sleep_seconds: None,
2241            rem_sleep_seconds: None,
2242            sleep_score: None,
2243            avg_stress: None,
2244            max_stress: None,
2245            body_battery_start: None,
2246            body_battery_end: None,
2247            hrv_weekly_avg: None,
2248            hrv_last_night: None,
2249            hrv_status: None,
2250            avg_respiration: None,
2251            avg_spo2: None,
2252            lowest_spo2: None,
2253            hydration_ml: None,
2254            moderate_intensity_min: None,
2255            vigorous_intensity_min: None,
2256            raw_json: None,
2257        };
2258
2259        let data = SyncData::Health { record, task_id: 1 };
2260        record_write_failure(&data, &progress, "write failed");
2261
2262        assert_eq!(progress.health.get_failed(), 1);
2263        let errors = progress.get_errors();
2264        assert_eq!(errors.len(), 1);
2265        assert_eq!(errors[0].stream, "Health");
2266    }
2267
2268    fn test_token() -> OAuth2Token {
2269        OAuth2Token {
2270            scope: "test".to_string(),
2271            jti: "jti".to_string(),
2272            token_type: "Bearer".to_string(),
2273            access_token: "access".to_string(),
2274            refresh_token: "refresh".to_string(),
2275            expires_in: 3600,
2276            expires_at: Utc::now().timestamp() + 3600,
2277            refresh_token_expires_in: 86400,
2278            refresh_token_expires_at: Utc::now().timestamp() + 86400,
2279        }
2280    }
2281
2282    #[tokio::test]
2283    async fn test_activity_pagination_respects_date_bounds() {
2284        let server = MockServer::start().await;
2285
2286        let body = serde_json::json!([
2287            {
2288                "activityId": 1,
2289                "activityName": "Newest",
2290                "startTimeLocal": "2025-01-05 08:00:00",
2291                "startTimeGMT": "2025-01-05 07:00:00",
2292                "activityType": { "typeKey": "running" },
2293                "hasPolyline": false
2294            },
2295            {
2296                "activityId": 2,
2297                "activityName": "Mid",
2298                "startTimeLocal": "2025-01-04 08:00:00",
2299                "startTimeGMT": "2025-01-04 07:00:00",
2300                "activityType": { "typeKey": "running" },
2301                "hasPolyline": false
2302            },
2303            {
2304                "activityId": 3,
2305                "activityName": "Old",
2306                "startTimeLocal": "2025-01-03 08:00:00",
2307                "startTimeGMT": "2025-01-03 07:00:00",
2308                "activityType": { "typeKey": "running" },
2309                "hasPolyline": false
2310            }
2311        ]);
2312
2313        Mock::given(method("GET"))
2314            .and(path("/activitylist-service/activities/search/activities"))
2315            .and(query_param("limit", "50"))
2316            .and(query_param("start", "0"))
2317            .respond_with(ResponseTemplate::new(200).set_body_json(body))
2318            .mount(&server)
2319            .await;
2320
2321        let client = GarminClient::new_with_base_url(&server.uri());
2322        let task = SyncTask {
2323            id: Some(1),
2324            profile_id: 1,
2325            task_type: SyncTaskType::Activities {
2326                start: 0,
2327                limit: 50,
2328                min_date: Some(NaiveDate::from_ymd_opt(2025, 1, 4).unwrap()),
2329                max_date: Some(NaiveDate::from_ymd_opt(2025, 1, 5).unwrap()),
2330            },
2331            pipeline: SyncPipeline::Frontier,
2332            status: TaskStatus::Pending,
2333            attempts: 0,
2334            last_error: None,
2335            created_at: None,
2336            next_retry_at: None,
2337            completed_at: None,
2338        };
2339
2340        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2341            .await
2342            .unwrap();
2343
2344        match data {
2345            SyncData::Activities {
2346                records,
2347                gpx_tasks,
2348                next_page,
2349                ..
2350            } => {
2351                assert_eq!(records.len(), 2);
2352                assert!(gpx_tasks.is_empty());
2353                assert!(next_page.is_none());
2354            }
2355            _ => panic!("unexpected data type"),
2356        }
2357    }
2358
2359    #[tokio::test]
2360    async fn test_daily_health_includes_sleep_and_hrv() {
2361        let server = MockServer::start().await;
2362        let date = NaiveDate::from_ymd_opt(2025, 12, 4).unwrap();
2363
2364        let health_body = json!({
2365            "totalSteps": 1234,
2366            "sleepingSeconds": 1000,
2367            "averageStressLevel": 20
2368        });
2369
2370        Mock::given(method("GET"))
2371            .and(path("/usersummary-service/usersummary/daily/TestUser"))
2372            .and(query_param("calendarDate", "2025-12-04"))
2373            .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2374            .mount(&server)
2375            .await;
2376
2377        let sleep_fixture: serde_json::Value =
2378            serde_json::from_str(include_str!("../../tests/fixtures/sleep_2025-12-04.json"))
2379                .unwrap();
2380
2381        Mock::given(method("GET"))
2382            .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2383            .and(query_param("date", "2025-12-04"))
2384            .respond_with(ResponseTemplate::new(200).set_body_json(sleep_fixture))
2385            .mount(&server)
2386            .await;
2387
2388        let hrv_fixture: serde_json::Value =
2389            serde_json::from_str(include_str!("../../tests/fixtures/hrv.json")).unwrap();
2390
2391        Mock::given(method("GET"))
2392            .and(path("/hrv-service/hrv/2025-12-04"))
2393            .respond_with(ResponseTemplate::new(200).set_body_json(hrv_fixture))
2394            .mount(&server)
2395            .await;
2396
2397        let client = GarminClient::new_with_base_url(&server.uri());
2398        let mut task = SyncTask::new(
2399            1,
2400            SyncPipeline::Frontier,
2401            SyncTaskType::DailyHealth { date },
2402        );
2403        task.id = Some(1);
2404
2405        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2406            .await
2407            .unwrap();
2408
2409        match data {
2410            SyncData::Health { record, .. } => {
2411                assert_eq!(record.sleep_seconds, Some(31920));
2412                assert_eq!(record.deep_sleep_seconds, Some(8100));
2413                assert_eq!(record.light_sleep_seconds, Some(15300));
2414                assert_eq!(record.rem_sleep_seconds, Some(8520));
2415                assert_eq!(record.sleep_score, Some(88));
2416                assert_eq!(record.hrv_weekly_avg, Some(65));
2417                assert_eq!(record.hrv_last_night, Some(68));
2418                assert_eq!(record.hrv_status.as_deref(), Some("BALANCED"));
2419            }
2420            _ => panic!("unexpected data type"),
2421        }
2422    }
2423
2424    #[tokio::test]
2425    async fn test_daily_health_handles_missing_sleep_and_hrv() {
2426        let server = MockServer::start().await;
2427        let date = NaiveDate::from_ymd_opt(2025, 12, 5).unwrap();
2428
2429        let health_body = json!({
2430            "totalSteps": 4321,
2431            "sleepingSeconds": 7200
2432        });
2433
2434        Mock::given(method("GET"))
2435            .and(path("/usersummary-service/usersummary/daily/TestUser"))
2436            .and(query_param("calendarDate", "2025-12-05"))
2437            .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2438            .mount(&server)
2439            .await;
2440
2441        Mock::given(method("GET"))
2442            .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2443            .and(query_param("date", "2025-12-05"))
2444            .respond_with(ResponseTemplate::new(404))
2445            .mount(&server)
2446            .await;
2447
2448        Mock::given(method("GET"))
2449            .and(path("/hrv-service/hrv/2025-12-05"))
2450            .respond_with(ResponseTemplate::new(404))
2451            .mount(&server)
2452            .await;
2453
2454        let client = GarminClient::new_with_base_url(&server.uri());
2455        let mut task = SyncTask::new(
2456            1,
2457            SyncPipeline::Frontier,
2458            SyncTaskType::DailyHealth { date },
2459        );
2460        task.id = Some(1);
2461
2462        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2463            .await
2464            .unwrap();
2465
2466        match data {
2467            SyncData::Health { record, .. } => {
2468                assert_eq!(record.sleep_seconds, Some(7200));
2469                assert_eq!(record.deep_sleep_seconds, None);
2470                assert_eq!(record.hrv_weekly_avg, None);
2471                assert_eq!(record.hrv_status, None);
2472            }
2473            _ => panic!("unexpected data type"),
2474        }
2475    }
2476
2477    #[test]
2478    fn test_parse_hrv_metrics_reads_last_night_avg() {
2479        let payload = json!({
2480            "hrvSummary": {
2481                "weeklyAvg": 60,
2482                "lastNightAvg": 58,
2483                "status": "BALANCED"
2484            }
2485        });
2486
2487        let (weekly_avg, last_night, status) = parse_hrv_metrics(Some(&payload));
2488        assert_eq!(weekly_avg, Some(60));
2489        assert_eq!(last_night, Some(58));
2490        assert_eq!(status.as_deref(), Some("BALANCED"));
2491    }
2492
2493    #[tokio::test]
2494    async fn test_performance_uses_date_scoped_endpoints() {
2495        let server = MockServer::start().await;
2496        let date = NaiveDate::from_ymd_opt(2025, 12, 10).unwrap();
2497
2498        let vo2_fixture: serde_json::Value =
2499            serde_json::from_str(include_str!("../../tests/fixtures/vo2max.json")).unwrap();
2500        Mock::given(method("GET"))
2501            .and(path(
2502                "/metrics-service/metrics/maxmet/daily/2025-12-10/2025-12-10",
2503            ))
2504            .respond_with(ResponseTemplate::new(200).set_body_json(vo2_fixture))
2505            .mount(&server)
2506            .await;
2507
2508        let race_fixture: serde_json::Value =
2509            serde_json::from_str(include_str!("../../tests/fixtures/race_predictions.json"))
2510                .unwrap();
2511        Mock::given(method("GET"))
2512            .and(path(
2513                "/metrics-service/metrics/racepredictions/daily/TestUser",
2514            ))
2515            .and(query_param("fromCalendarDate", "2025-12-10"))
2516            .and(query_param("toCalendarDate", "2025-12-10"))
2517            .respond_with(ResponseTemplate::new(200).set_body_json(race_fixture))
2518            .mount(&server)
2519            .await;
2520
2521        let readiness_fixture: serde_json::Value =
2522            serde_json::from_str(include_str!("../../tests/fixtures/training_readiness.json"))
2523                .unwrap();
2524        Mock::given(method("GET"))
2525            .and(path(
2526                "/metrics-service/metrics/trainingreadiness/2025-12-10",
2527            ))
2528            .respond_with(ResponseTemplate::new(200).set_body_json(readiness_fixture))
2529            .mount(&server)
2530            .await;
2531
2532        let training_status_fixture = json!({
2533            "mostRecentTrainingStatus": {
2534                "latestTrainingStatusData": {
2535                    "123": {
2536                        "calendarDate": "2025-12-10",
2537                        "trainingStatusFeedbackPhrase": "PRODUCTIVE"
2538                    }
2539                }
2540            }
2541        });
2542        Mock::given(method("GET"))
2543            .and(path(
2544                "/metrics-service/metrics/trainingstatus/aggregated/2025-12-10",
2545            ))
2546            .respond_with(ResponseTemplate::new(200).set_body_json(training_status_fixture))
2547            .mount(&server)
2548            .await;
2549
2550        let endurance_fixture: serde_json::Value =
2551            serde_json::from_str(include_str!("../../tests/fixtures/endurance_score.json"))
2552                .unwrap();
2553        Mock::given(method("GET"))
2554            .and(path("/metrics-service/metrics/endurancescore"))
2555            .and(query_param("calendarDate", "2025-12-10"))
2556            .respond_with(ResponseTemplate::new(200).set_body_json(endurance_fixture))
2557            .mount(&server)
2558            .await;
2559
2560        let hill_fixture: serde_json::Value =
2561            serde_json::from_str(include_str!("../../tests/fixtures/hill_score.json")).unwrap();
2562        Mock::given(method("GET"))
2563            .and(path("/metrics-service/metrics/hillscore"))
2564            .and(query_param("calendarDate", "2025-12-10"))
2565            .respond_with(ResponseTemplate::new(200).set_body_json(hill_fixture))
2566            .mount(&server)
2567            .await;
2568
2569        let fitness_age_fixture = json!({
2570            "calendarDate": "2025-12-10",
2571            "fitnessAge": 37.0
2572        });
2573        Mock::given(method("GET"))
2574            .and(path("/fitnessage-service/fitnessage/2025-12-10"))
2575            .respond_with(ResponseTemplate::new(200).set_body_json(fitness_age_fixture))
2576            .mount(&server)
2577            .await;
2578
2579        let client = GarminClient::new_with_base_url(&server.uri());
2580        let mut task = SyncTask::new(
2581            1,
2582            SyncPipeline::Backfill,
2583            SyncTaskType::Performance { date },
2584        );
2585        task.id = Some(1);
2586
2587        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2588            .await
2589            .unwrap();
2590
2591        match data {
2592            SyncData::Performance { record, .. } => {
2593                assert_eq!(record.vo2max, Some(53.0));
2594                assert_eq!(record.fitness_age, Some(37));
2595                assert_eq!(record.training_readiness, Some(69));
2596                assert_eq!(record.training_status.as_deref(), Some("PRODUCTIVE"));
2597                assert_eq!(record.race_5k_sec, Some(1245));
2598                assert_eq!(record.race_10k_sec, Some(2610));
2599                assert_eq!(record.race_half_sec, Some(5850));
2600                assert_eq!(record.race_marathon_sec, Some(12420));
2601                assert_eq!(record.endurance_score, Some(72));
2602                assert_eq!(record.hill_score, Some(58));
2603            }
2604            _ => panic!("unexpected data type"),
2605        }
2606    }
2607
2608    #[test]
2609    fn test_parse_garmin_datetime_accepts_naive_strings() {
2610        let dt = parse_garmin_datetime("2025-01-03 07:00:00");
2611        assert!(dt.is_some());
2612
2613        let dt = parse_garmin_datetime("2025-01-03 07:00:00.123");
2614        assert!(dt.is_some());
2615    }
2616}