Skip to main content

garmin_cli/sync/
mod.rs

1//! Sync module for Garmin data synchronization
2//!
3//! Provides:
4//! - Rate-limited API access with parallel streams
5//! - Persistent task queue for crash recovery (SQLite)
6//! - Incremental sync with gap detection
7//! - GPX parsing for track points
8//! - Parquet storage for concurrent read access
9//! - Producer/consumer pipeline for concurrent fetching and writing
10//! - Plain terminal progress output
11
12pub mod progress;
13pub mod rate_limiter;
14pub mod task_queue;
15
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
20use tokio::sync::{mpsc, Mutex as TokioMutex};
21
22use crate::client::{GarminClient, OAuth2Token};
23use crate::db::models::{
24    Activity, DailyHealth, PerformanceMetrics, SyncPipeline, SyncTask, SyncTaskType, TrackPoint,
25};
26use crate::storage::{ParquetStore, Storage, SyncDb};
27use crate::{GarminError, Result};
28use std::io::{self, Write};
29
30pub use progress::{PlanningStep, SharedProgress, SyncProgress};
31pub use rate_limiter::{RateLimiter, SharedRateLimiter};
32pub use task_queue::{SharedTaskQueue, TaskQueue};
33
34fn pipeline_filter(mode: progress::SyncMode) -> Option<SyncPipeline> {
35    match mode {
36        progress::SyncMode::Latest => Some(SyncPipeline::Frontier),
37        progress::SyncMode::Backfill => Some(SyncPipeline::Backfill),
38    }
39}
40
41/// Data produced by API fetchers, consumed by Parquet writers
42#[derive(Debug)]
43enum SyncData {
44    /// Activity list with parsed activities and potential follow-up tasks
45    Activities {
46        records: Vec<Activity>,
47        gpx_tasks: Vec<SyncTask>,
48        next_page: Option<SyncTask>,
49        task_id: i64,
50    },
51    /// Daily health record
52    Health { record: DailyHealth, task_id: i64 },
53    /// Performance metrics record
54    Performance {
55        record: PerformanceMetrics,
56        task_id: i64,
57    },
58    /// Track points from GPX
59    TrackPoints {
60        #[allow(dead_code)]
61        activity_id: i64,
62        date: NaiveDate,
63        points: Vec<TrackPoint>,
64        task_id: i64,
65    },
66}
67
68/// Sync engine for orchestrating data synchronization
69pub struct SyncEngine {
70    storage: Storage,
71    client: GarminClient,
72    token: OAuth2Token,
73    rate_limiter: RateLimiter,
74    queue: TaskQueue,
75    profile_id: i32,
76    display_name: Option<String>,
77}
78
79#[derive(Clone)]
80struct ProducerContext {
81    rate_limiter: SharedRateLimiter,
82    client: GarminClient,
83    token: OAuth2Token,
84    progress: SharedProgress,
85    display_name: Arc<String>,
86    profile_id: i32,
87    stats: Arc<TokioMutex<SyncStats>>,
88    in_flight: Arc<AtomicUsize>,
89    parquet: Arc<ParquetStore>,
90    force: bool,
91    pipeline_filter: Option<SyncPipeline>,
92}
93
94type SleepMetrics = (
95    Option<i32>,
96    Option<i32>,
97    Option<i32>,
98    Option<i32>,
99    Option<i32>,
100);
101
102#[derive(Debug, Clone, Copy, Default)]
103struct TrackPointStreams {
104    heart_rate: Option<i32>,
105    cadence: Option<i32>,
106    power: Option<i32>,
107    speed: Option<f64>,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111enum TrackPointStreamField {
112    HeartRate,
113    Cadence,
114    Power,
115    Speed,
116}
117
118impl SyncEngine {
119    /// Create a new sync engine with default storage location
120    pub fn new(client: GarminClient, token: OAuth2Token) -> Result<Self> {
121        let storage = Storage::open_default()?;
122        Self::with_storage(storage, client, token)
123    }
124
125    /// Create a new sync engine with custom storage
126    pub fn with_storage(
127        storage: Storage,
128        client: GarminClient,
129        token: OAuth2Token,
130    ) -> Result<Self> {
131        // Get or create profile (will be updated with display name after API call)
132        let profile_id = storage.sync_db.get_or_create_profile("default")?;
133
134        // Create task queue using the sync database
135        let sync_db = SyncDb::open(storage.base_path().join("sync.db"))?;
136        let queue = TaskQueue::new(sync_db, profile_id, None);
137
138        Ok(Self {
139            storage,
140            client,
141            token,
142            rate_limiter: RateLimiter::new(),
143            queue,
144            profile_id,
145            display_name: None,
146        })
147    }
148
149    /// Fetch and cache the user's display name
150    async fn get_display_name(&mut self) -> Result<String> {
151        if let Some(ref name) = self.display_name {
152            return Ok(name.clone());
153        }
154
155        let profile: serde_json::Value = self
156            .client
157            .get_json(&self.token, "/userprofile-service/socialProfile")
158            .await?;
159
160        let name = profile
161            .get("displayName")
162            .and_then(|v| v.as_str())
163            .map(|s| s.to_string())
164            .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
165
166        // Update profile in database
167        self.profile_id = self.storage.sync_db.get_or_create_profile(&name)?;
168        self.queue.set_profile_id(self.profile_id);
169
170        self.display_name = Some(name.clone());
171        Ok(name)
172    }
173
174    /// Find the oldest activity date and total count by querying the activities API
175    /// Returns (oldest_date, total_activities, activities_with_gps)
176    pub(crate) async fn find_oldest_activity_date(
177        &mut self,
178        progress: Option<&SyncProgress>,
179    ) -> Result<(NaiveDate, u32, u32)> {
180        if let Some(p) = progress {
181            p.set_planning_step(PlanningStep::FindingOldestActivity);
182        } else {
183            print!("Finding oldest activity date...");
184            let _ = io::stdout().flush();
185        }
186
187        // The API returns activities sorted by date descending (newest first)
188        // Use exponential search to find the end quickly, then fetch the last page
189
190        self.rate_limiter.wait().await;
191
192        // Step 1: Find approximate total count using exponential jumps
193        let limit: u32 = 100;
194        let mut jump: u32 = 100;
195        let mut last_non_empty: u32 = 0;
196        let max_jump: u32 = 1_000_000;
197
198        // Exponential search: 100, 200, 400, 800, 1600, 3200...
199        while jump < max_jump {
200            let path = format!(
201                "/activitylist-service/activities/search/activities?limit=1&start={}",
202                jump
203            );
204
205            let activities: Vec<serde_json::Value> =
206                self.client.get_json(&self.token, &path).await?;
207
208            if activities.is_empty() {
209                break;
210            }
211
212            last_non_empty = jump;
213            jump = jump.saturating_mul(2);
214            self.rate_limiter.wait().await;
215        }
216
217        // Step 2: Binary search to find exact end
218        let mut low = last_non_empty;
219        let mut high = jump;
220
221        while high - low > limit {
222            let mid = (low + high) / 2;
223            let path = format!(
224                "/activitylist-service/activities/search/activities?limit=1&start={}",
225                mid
226            );
227
228            self.rate_limiter.wait().await;
229            let activities: Vec<serde_json::Value> =
230                self.client.get_json(&self.token, &path).await?;
231
232            if activities.is_empty() {
233                high = mid;
234            } else {
235                low = mid;
236            }
237        }
238
239        // Step 3: Fetch the last page to get the oldest activity
240        let path = format!(
241            "/activitylist-service/activities/search/activities?limit={}&start={}",
242            limit, low
243        );
244
245        self.rate_limiter.wait().await;
246        let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
247
248        // Calculate total activities
249        let total_activities = low + activities.len() as u32;
250
251        let oldest_date = activities
252            .last()
253            .and_then(|activity| activity.get("startTimeLocal"))
254            .and_then(|v| v.as_str())
255            .and_then(|date_str| date_str.split(' ').next())
256            .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
257
258        let result = oldest_date.unwrap_or_else(|| {
259            // Default to 1 year ago if no activities found
260            Utc::now().date_naive() - Duration::days(365)
261        });
262
263        // Estimate activities with GPS (typically ~80% of activities have GPS)
264        // This is a rough estimate - actual count will be refined during sync
265        let estimated_gps = (total_activities as f32 * 0.8) as u32;
266
267        if let Some(p) = progress {
268            p.set_oldest_activity_date(&result.to_string());
269        } else {
270            println!(" {} ({} activities)", result, total_activities);
271        }
272        Ok((result, total_activities, estimated_gps))
273    }
274
275    async fn has_health_data(&mut self, display_name: &str, date: NaiveDate) -> Result<bool> {
276        let path = format!(
277            "/usersummary-service/usersummary/daily/{}?calendarDate={}",
278            display_name, date
279        );
280
281        self.rate_limiter.wait().await;
282        let result: std::result::Result<serde_json::Value, _> =
283            self.client.get_json(&self.token, &path).await;
284
285        match result {
286            Ok(data) => Ok(!data.as_object().map(|o| o.is_empty()).unwrap_or(true)),
287            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
288            Err(e) => Err(e),
289        }
290    }
291
292    async fn find_first_health_date(
293        &mut self,
294        progress: Option<&SyncProgress>,
295        from: NaiveDate,
296        to: NaiveDate,
297    ) -> Result<Option<NaiveDate>> {
298        if let Some(p) = progress {
299            p.set_planning_step(PlanningStep::FindingFirstHealth);
300        }
301
302        if from > to {
303            return Ok(None);
304        }
305
306        let display_name = self.get_display_name().await?;
307        let mut low = from;
308        let mut high = to;
309        let mut found = None;
310
311        while low <= high {
312            let span = (high - low).num_days();
313            let mid = low + Duration::days(span / 2);
314
315            if self.has_health_data(&display_name, mid).await? {
316                found = Some(mid);
317                if mid == low {
318                    break;
319                }
320                high = mid - Duration::days(1);
321            } else {
322                low = mid + Duration::days(1);
323            }
324        }
325
326        Ok(found)
327    }
328
329    async fn has_performance_data(&mut self, date: NaiveDate) -> Result<bool> {
330        let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
331        self.rate_limiter.wait().await;
332        let readiness: std::result::Result<serde_json::Value, _> =
333            self.client.get_json(&self.token, &readiness_path).await;
334
335        if let Ok(data) = readiness {
336            if data.as_array().and_then(|arr| arr.first()).is_some() {
337                return Ok(true);
338            }
339        }
340
341        let status_path = format!(
342            "/metrics-service/metrics/trainingstatus/aggregated/{}",
343            date
344        );
345        self.rate_limiter.wait().await;
346        let status: std::result::Result<serde_json::Value, _> =
347            self.client.get_json(&self.token, &status_path).await;
348
349        match status {
350            Ok(data) => Ok(data.get("mostRecentTrainingStatus").is_some()),
351            Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
352            Err(e) => Err(e),
353        }
354    }
355
356    async fn find_first_performance_date(
357        &mut self,
358        progress: Option<&SyncProgress>,
359        from: NaiveDate,
360        to: NaiveDate,
361    ) -> Result<Option<NaiveDate>> {
362        if let Some(p) = progress {
363            p.set_planning_step(PlanningStep::FindingFirstPerformance);
364        }
365
366        if from > to {
367            return Ok(None);
368        }
369
370        let mut low = from;
371        let mut high = to;
372        let mut found = None;
373
374        while low <= high {
375            let span = (high - low).num_days();
376            let mid = low + Duration::days(span / 2);
377
378            if self.has_performance_data(mid).await? {
379                found = Some(mid);
380                if mid == low {
381                    break;
382                }
383                high = mid - Duration::days(1);
384            } else {
385                low = mid + Duration::days(1);
386            }
387        }
388
389        Ok(found)
390    }
391
392    /// Run the sync process
393    pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
394        self.run_with_progress(&opts).await
395    }
396
397    /// Run sync with plain terminal progress reporting.
398    async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
399        let progress = Arc::new(SyncProgress::new());
400
401        // Set storage path and sync mode for display
402        progress.set_storage_path(&self.storage.base_path().display().to_string());
403        progress.set_sync_mode(opts.mode);
404        println!("Using storage: {}", progress.get_storage_path());
405        println!("Planning sync...");
406
407        // Planning phase - updates progress instead of printing
408        progress.set_planning_step(PlanningStep::FetchingProfile);
409        let display_name = self.get_display_name().await?;
410        progress.set_profile(&display_name);
411        self.queue.set_pipeline(pipeline_filter(opts.mode));
412
413        // Recover any crashed tasks
414        let _recovered = self.queue.recover_in_progress()?;
415
416        // Plan phase
417        if self.queue.pending_count()? == 0 {
418            self.plan_sync_with_progress(opts, &progress).await?;
419        }
420
421        // Mark planning complete
422        progress.finish_planning();
423
424        // Count tasks by type for progress tracking
425        self.count_tasks_for_progress(&progress)?;
426
427        // Set date ranges based on sync mode
428        let today = Utc::now().date_naive();
429        let to_date = opts.to_date.unwrap_or(today);
430
431        match opts.mode {
432            progress::SyncMode::Latest => {
433                // Latest mode: sync from last sync date (or 7 days ago) to today
434                let from_date = opts.from_date.unwrap_or_else(|| today - Duration::days(7));
435                progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
436                progress.set_date_range(&from_date.to_string(), &to_date.to_string());
437            }
438            progress::SyncMode::Backfill => {
439                // Backfill mode: sync from oldest activity backwards
440                let oldest = progress
441                    .get_oldest_activity_date()
442                    .unwrap_or_else(|| (today - Duration::days(365)).to_string());
443                let from_date = opts
444                    .from_date
445                    .map(|d| d.to_string())
446                    .unwrap_or(oldest.clone());
447                progress.set_backfill_range(&from_date, &oldest);
448                progress.set_date_range(&oldest, &from_date);
449            }
450        }
451
452        print_sync_overview(&progress);
453        println!("Planning complete.");
454
455        let stats_result = if opts.dry_run {
456            println!("Dry run mode - no changes will be made");
457            Ok(SyncStats::default())
458        } else {
459            let reporter_progress = progress.clone();
460            let reporter_handle = tokio::spawn(async move {
461                run_progress_reporter(reporter_progress).await;
462            });
463
464            let result = self
465                .run_with_progress_tracking(opts, progress.clone())
466                .await;
467            progress.request_shutdown();
468            let _ = reporter_handle.await;
469            result
470        };
471
472        let stats = stats_result?;
473        print_sync_errors(&progress);
474
475        // Update sync state after successful completion
476        if !opts.dry_run && stats.completed > 0 {
477            self.update_sync_state_after_completion(opts, today).await?;
478        }
479
480        if !opts.dry_run {
481            println!("\nSync complete: {}", stats);
482        }
483        Ok(stats)
484    }
485
486    /// Update sync state after successful sync completion
487    async fn update_sync_state_after_completion(
488        &self,
489        opts: &SyncOptions,
490        today: NaiveDate,
491    ) -> Result<()> {
492        use crate::db::models::SyncState;
493
494        match opts.mode {
495            progress::SyncMode::Latest => {
496                // Update last_sync_date to today
497                if opts.sync_activities {
498                    let state = SyncState {
499                        profile_id: self.profile_id,
500                        data_type: "activities".to_string(),
501                        last_sync_date: Some(today),
502                        last_activity_id: None,
503                    };
504                    self.storage.sync_db.update_sync_state(&state)?;
505                }
506                if opts.sync_health {
507                    let state = SyncState {
508                        profile_id: self.profile_id,
509                        data_type: "health".to_string(),
510                        last_sync_date: Some(today),
511                        last_activity_id: None,
512                    };
513                    self.storage.sync_db.update_sync_state(&state)?;
514                }
515            }
516            progress::SyncMode::Backfill => {
517                // Check if backfill is complete (all tasks done)
518                let pending = self.queue.pending_count()?;
519                if pending == 0 {
520                    // Mark backfill as complete
521                    self.storage
522                        .sync_db
523                        .mark_backfill_complete(self.profile_id, "activities")?;
524                    self.storage
525                        .sync_db
526                        .mark_backfill_complete(self.profile_id, "health")?;
527                    self.storage
528                        .sync_db
529                        .mark_backfill_complete(self.profile_id, "performance")?;
530                }
531            }
532        }
533
534        Ok(())
535    }
536
537    /// Run sync with progress tracking using parallel producer/consumer pipeline
538    async fn run_with_progress_tracking(
539        &mut self,
540        opts: &SyncOptions,
541        progress: SharedProgress,
542    ) -> Result<SyncStats> {
543        // Use parallel execution with producer/consumer pipeline
544        self.run_parallel(opts, progress).await
545    }
546
547    /// Run parallel sync with producer/consumer pipeline
548    ///
549    /// Producers: Fetch data from Garmin API (rate-limited)
550    /// Consumers: Write data to Parquet (partition-locked)
551    async fn run_parallel(
552        &mut self,
553        opts: &SyncOptions,
554        progress: SharedProgress,
555    ) -> Result<SyncStats> {
556        let rate_limiter = SharedRateLimiter::new(opts.concurrency);
557        let queue = SharedTaskQueue::new(TaskQueue::new(
558            SyncDb::open(self.storage.base_path().join("sync.db"))?,
559            self.profile_id,
560            pipeline_filter(opts.mode),
561        ));
562
563        self.run_parallel_with_resources(
564            opts,
565            progress,
566            queue,
567            rate_limiter,
568            pipeline_filter(opts.mode),
569        )
570        .await
571    }
572
573    pub(crate) async fn run_parallel_with_resources(
574        &mut self,
575        opts: &SyncOptions,
576        progress: SharedProgress,
577        queue: SharedTaskQueue,
578        rate_limiter: SharedRateLimiter,
579        pipeline_filter: Option<SyncPipeline>,
580    ) -> Result<SyncStats> {
581        // Bounded channel for backpressure (100 items)
582        let (tx, rx) = mpsc::channel::<SyncData>(100);
583
584        // Shared resources
585        let parquet = Arc::new(self.storage.parquet.clone());
586        let client = self.client.clone();
587        let token = self.token.clone();
588        let stats = Arc::new(TokioMutex::new(SyncStats::default()));
589        let in_flight = Arc::new(AtomicUsize::new(0));
590        let display_name = Arc::new(self.get_display_name().await?);
591        let profile_id = self.profile_id;
592
593        // Spawn producers (API fetchers)
594        let mut producer_handles = Vec::new();
595        for id in 0..opts.concurrency {
596            let tx = tx.clone();
597            let queue = queue.clone();
598            let context = ProducerContext {
599                rate_limiter: rate_limiter.clone(),
600                client: client.clone(),
601                token: token.clone(),
602                progress: progress.clone(),
603                display_name: Arc::clone(&display_name),
604                profile_id,
605                stats: Arc::clone(&stats),
606                in_flight: Arc::clone(&in_flight),
607                parquet: Arc::clone(&parquet),
608                force: opts.force,
609                pipeline_filter,
610            };
611
612            producer_handles.push(tokio::spawn(async move {
613                producer_loop(id, queue, tx, context).await
614            }));
615        }
616        drop(tx); // Close sender so consumers know when done
617
618        // Spawn consumers (Parquet writers)
619        let rx = Arc::new(TokioMutex::new(rx));
620        let mut consumer_handles = Vec::new();
621        for id in 0..opts.concurrency {
622            let rx = Arc::clone(&rx);
623            let parquet = Arc::clone(&parquet);
624            let queue = queue.clone();
625            let stats = Arc::clone(&stats);
626            let progress = progress.clone();
627            let in_flight = Arc::clone(&in_flight);
628
629            consumer_handles.push(tokio::spawn(async move {
630                consumer_loop(id, rx, parquet, queue, stats, progress, in_flight).await
631            }));
632        }
633
634        // Wait for all producers to finish
635        for h in producer_handles {
636            if let Err(e) = h.await {
637                eprintln!("Producer error: {}", e);
638            }
639        }
640
641        // Wait for all consumers to finish
642        for h in consumer_handles {
643            if let Err(e) = h.await {
644                eprintln!("Consumer error: {}", e);
645            }
646        }
647
648        // Cleanup old completed tasks
649        queue.cleanup(7).await?;
650
651        // Update profile sync time
652        self.storage
653            .sync_db
654            .update_profile_sync_time(self.profile_id)?;
655
656        // Extract final stats
657        let final_stats = stats.lock().await;
658        Ok(SyncStats {
659            recovered: final_stats.recovered,
660            completed: final_stats.completed,
661            rate_limited: final_stats.rate_limited,
662            failed: final_stats.failed,
663        })
664    }
665
666    /// Count pending tasks by type and update progress
667    ///
668    /// Sets initial totals based on actual tasks in queue.
669    /// GPX totals are updated dynamically as activities are discovered.
670    fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
671        // Count actual tasks by type from the queue
672        let (_activities, _gpx, health, performance) = self.queue.count_by_type()?;
673
674        // Activities and GPX totals are set during planning from API discovery
675        // Only set them if planning didn't provide accurate counts
676        if progress.activities.get_total() == 0 {
677            progress.activities.set_total(1); // At least 1 for pagination
678            progress.activities.set_dynamic(true);
679        }
680        if progress.gpx.get_total() == 0 {
681            progress.gpx.set_dynamic(true); // Will be discovered during sync
682        }
683
684        // Health and performance totals come from date range calculation
685        progress.health.set_total(health);
686        progress.performance.set_total(performance);
687
688        Ok(())
689    }
690
691    /// Plan sync tasks with progress tracking.
692    async fn plan_sync_with_progress(
693        &mut self,
694        opts: &SyncOptions,
695        progress: &SyncProgress,
696    ) -> Result<()> {
697        let today = Utc::now().date_naive();
698
699        match opts.mode {
700            progress::SyncMode::Latest => self.plan_latest_sync(opts, progress, today).await,
701            progress::SyncMode::Backfill => self.plan_backfill_sync(opts, progress, today).await,
702        }
703    }
704
705    /// Plan Latest sync: from last_sync_date to today
706    pub(crate) async fn plan_latest_sync(
707        &mut self,
708        opts: &SyncOptions,
709        progress: &SyncProgress,
710        today: NaiveDate,
711    ) -> Result<()> {
712        // Get last sync date from DB, default to 7 days ago
713        let last_sync = self
714            .storage
715            .sync_db
716            .get_sync_state(self.profile_id, "activities")?;
717        let from_date = opts.from_date.unwrap_or_else(|| {
718            last_sync
719                .and_then(|s| s.last_sync_date)
720                .unwrap_or_else(|| today - Duration::days(7))
721        });
722        let to_date = opts.to_date.unwrap_or(today);
723
724        progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
725        progress.set_oldest_activity_date(&from_date.to_string());
726
727        let total_days = (to_date - from_date).num_days().max(0) as u32 + 1;
728
729        // Plan activity sync (paginated from newest)
730        if opts.sync_activities {
731            progress.set_planning_step(PlanningStep::PlanningActivities);
732            self.plan_activities_sync(SyncPipeline::Frontier, Some(from_date), Some(to_date))?;
733        }
734
735        // Plan health sync for date range
736        if opts.sync_health {
737            progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
738            self.plan_health_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
739        }
740
741        // Plan performance sync
742        if opts.sync_performance {
743            let total_weeks = (total_days / 7).max(1);
744            progress.set_planning_step(PlanningStep::PlanningPerformance { weeks: total_weeks });
745            self.plan_performance_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
746        }
747
748        Ok(())
749    }
750
751    /// Plan Backfill sync: from oldest_activity_date to backfill_frontier
752    pub(crate) async fn plan_backfill_sync(
753        &mut self,
754        opts: &SyncOptions,
755        progress: &SyncProgress,
756        today: NaiveDate,
757    ) -> Result<()> {
758        // Find oldest activity date (this also gives us total count)
759        let (oldest_date, total_activities, estimated_gps) =
760            self.find_oldest_activity_date(Some(progress)).await?;
761
762        // Get or initialize backfill frontier
763        let backfill_state = self
764            .storage
765            .sync_db
766            .get_backfill_state(self.profile_id, "activities")?;
767
768        let (frontier_date, activities_complete) = match backfill_state {
769            Some((frontier, _target, complete)) => (frontier, complete),
770            None => {
771                // Initialize frontier from last_sync_date or today
772                let last_sync = self
773                    .storage
774                    .sync_db
775                    .get_sync_state(self.profile_id, "activities")?;
776                let frontier = last_sync.and_then(|s| s.last_sync_date).unwrap_or(today);
777
778                // Initialize backfill state
779                self.storage.sync_db.set_backfill_state(
780                    self.profile_id,
781                    "activities",
782                    frontier,
783                    oldest_date,
784                    false,
785                )?;
786                (frontier, false)
787            }
788        };
789
790        // Backfill from oldest_date to frontier_date
791        let activity_from = opts.from_date.unwrap_or(oldest_date);
792        let activity_to = opts.to_date.unwrap_or(frontier_date);
793
794        progress.set_backfill_range(&frontier_date.to_string(), &oldest_date.to_string());
795        progress.set_oldest_activity_date(&oldest_date.to_string());
796
797        // Plan activity sync with known totals
798        if opts.sync_activities && !activities_complete {
799            progress.set_planning_step(PlanningStep::PlanningActivities);
800            self.plan_activities_sync(
801                SyncPipeline::Backfill,
802                Some(activity_from),
803                Some(activity_to),
804            )?;
805            if total_activities > 0 {
806                progress.activities.set_total(total_activities);
807                progress.gpx.set_total(estimated_gps);
808            }
809        }
810
811        if opts.sync_health {
812            let health_state = self
813                .storage
814                .sync_db
815                .get_backfill_state(self.profile_id, "health")?;
816
817            let health_target = match health_state {
818                Some((_frontier, target, complete)) => (!complete).then_some(target),
819                None => {
820                    let search_from = activity_from;
821                    let search_to = activity_to;
822                    let first_health = self
823                        .find_first_health_date(Some(progress), search_from, search_to)
824                        .await?;
825                    match first_health {
826                        Some(first) => {
827                            self.storage.sync_db.set_backfill_state(
828                                self.profile_id,
829                                "health",
830                                frontier_date,
831                                first,
832                                false,
833                            )?;
834                            Some(first)
835                        }
836                        None => {
837                            self.storage.sync_db.set_backfill_state(
838                                self.profile_id,
839                                "health",
840                                frontier_date,
841                                frontier_date,
842                                true,
843                            )?;
844                            None
845                        }
846                    }
847                }
848            };
849
850            if let Some(target) = health_target {
851                let health_from = std::cmp::max(activity_from, target);
852                let health_to = activity_to;
853                if health_from <= health_to {
854                    let total_days = (health_to - health_from).num_days().max(0) as u32 + 1;
855                    progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
856                    self.plan_health_sync(
857                        health_from,
858                        health_to,
859                        opts.force,
860                        SyncPipeline::Backfill,
861                    )?;
862                }
863            }
864        }
865
866        if opts.sync_performance {
867            let perf_state = self
868                .storage
869                .sync_db
870                .get_backfill_state(self.profile_id, "performance")?;
871
872            let perf_target = match perf_state {
873                Some((_frontier, target, complete)) => (!complete).then_some(target),
874                None => {
875                    let search_from = activity_from;
876                    let search_to = activity_to;
877                    let first_perf = self
878                        .find_first_performance_date(Some(progress), search_from, search_to)
879                        .await?;
880                    match first_perf {
881                        Some(first) => {
882                            self.storage.sync_db.set_backfill_state(
883                                self.profile_id,
884                                "performance",
885                                frontier_date,
886                                first,
887                                false,
888                            )?;
889                            Some(first)
890                        }
891                        None => {
892                            self.storage.sync_db.set_backfill_state(
893                                self.profile_id,
894                                "performance",
895                                frontier_date,
896                                frontier_date,
897                                true,
898                            )?;
899                            None
900                        }
901                    }
902                }
903            };
904
905            if let Some(target) = perf_target {
906                let perf_from = std::cmp::max(activity_from, target);
907                let perf_to = activity_to;
908                if perf_from <= perf_to {
909                    let total_weeks = ((perf_to - perf_from).num_days().max(0) as u32 / 7).max(1);
910                    progress.set_planning_step(PlanningStep::PlanningPerformance {
911                        weeks: total_weeks,
912                    });
913                    self.plan_performance_sync(
914                        perf_from,
915                        perf_to,
916                        opts.force,
917                        SyncPipeline::Backfill,
918                    )?;
919                }
920            }
921        }
922
923        Ok(())
924    }
925
926    /// Plan activity sync tasks
927    fn plan_activities_sync(
928        &self,
929        pipeline: SyncPipeline,
930        min_date: Option<NaiveDate>,
931        max_date: Option<NaiveDate>,
932    ) -> Result<()> {
933        // Start with first page, we'll add more as we discover them
934        let task = SyncTask::new(
935            self.profile_id,
936            pipeline,
937            SyncTaskType::Activities {
938                start: 0,
939                limit: 50,
940                min_date,
941                max_date,
942            },
943        );
944        self.queue.push(task)?;
945        Ok(())
946    }
947
948    /// Plan health sync tasks for date range, returns count of tasks added
949    fn plan_health_sync(
950        &self,
951        from: NaiveDate,
952        to: NaiveDate,
953        force: bool,
954        pipeline: SyncPipeline,
955    ) -> Result<u32> {
956        let mut count = 0;
957        let mut date = from;
958        while date <= to {
959            if force
960                || !self
961                    .storage
962                    .parquet
963                    .has_daily_health(self.profile_id, date)?
964            {
965                let task = SyncTask::new(
966                    self.profile_id,
967                    pipeline,
968                    SyncTaskType::DailyHealth { date },
969                );
970                self.queue.push(task)?;
971                count += 1;
972            }
973            date += Duration::days(1);
974        }
975        Ok(count)
976    }
977
978    /// Plan performance sync tasks, returns count of tasks added
979    fn plan_performance_sync(
980        &self,
981        from: NaiveDate,
982        to: NaiveDate,
983        force: bool,
984        pipeline: SyncPipeline,
985    ) -> Result<u32> {
986        // Performance metrics don't change daily, sync weekly
987        let mut count = 0;
988        let mut date = from;
989        while date <= to {
990            if force
991                || !self
992                    .storage
993                    .parquet
994                    .has_performance_metrics(self.profile_id, date)?
995            {
996                let task = SyncTask::new(
997                    self.profile_id,
998                    pipeline,
999                    SyncTaskType::Performance { date },
1000                );
1001                self.queue.push(task)?;
1002                count += 1;
1003            }
1004            date += Duration::days(7);
1005        }
1006        Ok(count)
1007    }
1008}
1009
1010fn print_sync_overview(progress: &SyncProgress) {
1011    println!("Profile: {}", progress.get_profile());
1012    println!("Mode: {}", progress.get_sync_mode());
1013    println!("Range: {}", progress.get_date_range());
1014    println!(
1015        "Queued tasks: activities {}, gpx {}, health {}, performance {}",
1016        progress.activities.get_total(),
1017        progress.gpx.get_total(),
1018        progress.health.get_total(),
1019        progress.performance.get_total()
1020    );
1021}
1022
1023fn print_sync_errors(progress: &SyncProgress) {
1024    let errors = progress.get_errors();
1025    if errors.is_empty() {
1026        return;
1027    }
1028
1029    println!("\nRecent errors:");
1030    for error in errors.iter().take(5) {
1031        println!("  [{}] {}: {}", error.stream, error.item, error.error);
1032    }
1033
1034    if errors.len() > 5 {
1035        println!("  ... and {} more", errors.len() - 5);
1036    }
1037}
1038
1039async fn run_progress_reporter(progress: SharedProgress) {
1040    loop {
1041        progress.print_simple_status();
1042
1043        if progress.should_shutdown() || progress.is_complete() {
1044            println!();
1045            break;
1046        }
1047
1048        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1049    }
1050}
1051
1052/// Options for sync operation
1053#[derive(Clone)]
1054pub struct SyncOptions {
1055    /// Sync activities
1056    pub sync_activities: bool,
1057    /// Sync daily health
1058    pub sync_health: bool,
1059    /// Sync performance metrics
1060    pub sync_performance: bool,
1061    /// Start date for sync
1062    pub from_date: Option<NaiveDate>,
1063    /// End date for sync
1064    pub to_date: Option<NaiveDate>,
1065    /// Dry run (plan only, don't execute)
1066    pub dry_run: bool,
1067    /// Force re-sync (ignore existing data)
1068    pub force: bool,
1069    /// Number of concurrent API requests (default: 4)
1070    pub concurrency: usize,
1071    /// Sync mode (Latest or Backfill)
1072    pub mode: progress::SyncMode,
1073}
1074
1075impl Default for SyncOptions {
1076    fn default() -> Self {
1077        Self {
1078            sync_activities: false,
1079            sync_health: false,
1080            sync_performance: false,
1081            from_date: None,
1082            to_date: None,
1083            dry_run: false,
1084            force: false,
1085            concurrency: 4,
1086            mode: progress::SyncMode::Latest,
1087        }
1088    }
1089}
1090
1091/// Statistics from sync operation
1092#[derive(Default)]
1093pub struct SyncStats {
1094    /// Tasks recovered from previous run
1095    pub recovered: u32,
1096    /// Tasks completed successfully
1097    pub completed: u32,
1098    /// Tasks that hit rate limits
1099    pub rate_limited: u32,
1100    /// Tasks that failed
1101    pub failed: u32,
1102}
1103
1104impl std::fmt::Display for SyncStats {
1105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106        write!(
1107            f,
1108            "Completed: {}, Failed: {}, Rate limited: {}",
1109            self.completed, self.failed, self.rate_limited
1110        )?;
1111        if self.recovered > 0 {
1112            write!(f, ", Recovered: {}", self.recovered)?;
1113        }
1114        Ok(())
1115    }
1116}
1117
1118/// Update progress when starting a task
1119fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1120    let desc = match &task.task_type {
1121        SyncTaskType::Activities { start, limit, .. } => {
1122            format!("Activities {}-{}", start, start + limit)
1123        }
1124        SyncTaskType::DownloadGpx {
1125            activity_name,
1126            activity_date,
1127            ..
1128        } => {
1129            let name = activity_name.as_deref().unwrap_or("Unknown");
1130            let date = activity_date.as_deref().unwrap_or("");
1131            if date.is_empty() {
1132                name.to_string()
1133            } else {
1134                format!("{} {}", date, name)
1135            }
1136        }
1137        SyncTaskType::DailyHealth { date } => date.to_string(),
1138        SyncTaskType::Performance { date } => date.to_string(),
1139        _ => String::new(),
1140    };
1141
1142    match &task.task_type {
1143        SyncTaskType::Activities { .. } => {
1144            progress.activities.set_current_item(desc.clone());
1145            progress.activities.set_last_item(desc);
1146        }
1147        SyncTaskType::DownloadGpx { .. } => {
1148            progress.gpx.set_current_item(desc.clone());
1149            progress.gpx.set_last_item(desc);
1150        }
1151        SyncTaskType::DailyHealth { .. } => {
1152            progress.health.set_current_item(desc.clone());
1153            progress.health.set_last_item(desc);
1154        }
1155        SyncTaskType::Performance { .. } => {
1156            progress.performance.set_current_item(desc.clone());
1157            progress.performance.set_last_item(desc);
1158        }
1159        _ => {}
1160    }
1161}
1162
1163/// Mark a task as completed in progress
1164#[allow(dead_code)]
1165fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1166    match &task.task_type {
1167        SyncTaskType::Activities { .. } => {
1168            progress.activities.complete_one();
1169            progress.activities.clear_current_item();
1170        }
1171        SyncTaskType::DownloadGpx { .. } => {
1172            progress.gpx.complete_one();
1173            progress.gpx.clear_current_item();
1174        }
1175        SyncTaskType::DailyHealth { .. } => {
1176            progress.health.complete_one();
1177            progress.health.clear_current_item();
1178        }
1179        SyncTaskType::Performance { .. } => {
1180            progress.performance.complete_one();
1181            progress.performance.clear_current_item();
1182        }
1183        _ => {}
1184    }
1185}
1186
1187/// Mark a task as failed in progress and record error details
1188fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress, error: &str) {
1189    let (stream_name, item_desc) = match &task.task_type {
1190        SyncTaskType::Activities { start, limit, .. } => {
1191            progress.activities.fail_one();
1192            progress.activities.clear_current_item();
1193            ("Activities", format!("{}-{}", start, start + limit))
1194        }
1195        SyncTaskType::DownloadGpx {
1196            activity_id,
1197            activity_name,
1198            ..
1199        } => {
1200            progress.gpx.fail_one();
1201            progress.gpx.clear_current_item();
1202            (
1203                "GPX",
1204                activity_name
1205                    .clone()
1206                    .unwrap_or_else(|| activity_id.to_string()),
1207            )
1208        }
1209        SyncTaskType::DailyHealth { date } => {
1210            progress.health.fail_one();
1211            progress.health.clear_current_item();
1212            ("Health", date.to_string())
1213        }
1214        SyncTaskType::Performance { date } => {
1215            progress.performance.fail_one();
1216            progress.performance.clear_current_item();
1217            ("Performance", date.to_string())
1218        }
1219        _ => return,
1220    };
1221
1222    progress.add_error(stream_name, item_desc, error.to_string());
1223}
1224
1225const MAX_IDLE_RETRIES: u32 = 10;
1226
1227fn should_exit_when_idle(idle_loops: u32, in_flight: usize) -> bool {
1228    idle_loops >= MAX_IDLE_RETRIES && in_flight == 0
1229}
1230
1231fn record_write_failure(data: &SyncData, progress: &SyncProgress, error: &str) {
1232    match data {
1233        SyncData::Activities { records, .. } => {
1234            progress.activities.fail_one();
1235            progress.activities.clear_current_item();
1236            let item = records
1237                .first()
1238                .map(|r| r.activity_id.to_string())
1239                .unwrap_or_else(|| "batch".to_string());
1240            progress.add_error("Activities", item, error.to_string());
1241        }
1242        SyncData::Health { record, .. } => {
1243            progress.health.fail_one();
1244            progress.health.clear_current_item();
1245            progress.add_error("Health", record.date.to_string(), error.to_string());
1246        }
1247        SyncData::Performance { record, .. } => {
1248            progress.performance.fail_one();
1249            progress.performance.clear_current_item();
1250            progress.add_error("Performance", record.date.to_string(), error.to_string());
1251        }
1252        SyncData::TrackPoints {
1253            activity_id, date, ..
1254        } => {
1255            progress.gpx.fail_one();
1256            progress.gpx.clear_current_item();
1257            progress.add_error(
1258                "GPX",
1259                format!("{} ({})", date, activity_id),
1260                error.to_string(),
1261            );
1262        }
1263    }
1264}
1265
1266// =============================================================================
1267// Producer/Consumer Pipeline
1268// =============================================================================
1269
1270/// Producer loop: fetches data from Garmin API and sends to channel
1271async fn producer_loop(
1272    _id: usize,
1273    queue: SharedTaskQueue,
1274    tx: mpsc::Sender<SyncData>,
1275    context: ProducerContext,
1276) {
1277    let mut empty_count = 0;
1278
1279    loop {
1280        // Pop next task
1281        let next_task = match context.pipeline_filter {
1282            Some(pipeline) => queue.pop_round_robin_with_pipeline(Some(pipeline)).await,
1283            None => queue.pop_round_robin().await,
1284        };
1285
1286        let task = match next_task {
1287            Ok(Some(task)) => {
1288                empty_count = 0; // Reset counter on successful pop
1289                task
1290            }
1291            Ok(None) => {
1292                // Queue is empty, but consumers might be adding new tasks
1293                // Wait a bit and retry before giving up
1294                empty_count += 1;
1295                if should_exit_when_idle(empty_count, context.in_flight.load(Ordering::Relaxed)) {
1296                    break; // No more tasks after multiple retries
1297                }
1298                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1299                continue;
1300            }
1301            Err(e) => {
1302                eprintln!("Queue error: {}", e);
1303                break;
1304            }
1305        };
1306
1307        let task_id = task.id.unwrap();
1308        context.in_flight.fetch_add(1, Ordering::Relaxed);
1309
1310        // Mark in progress
1311        if let Err(e) = queue.mark_in_progress(task_id).await {
1312            eprintln!("Failed to mark task in progress: {}", e);
1313            continue;
1314        }
1315
1316        // Update progress display
1317        update_progress_for_task(&task, &context.progress);
1318
1319        // Skip tasks that already exist unless forcing
1320        if !context.force {
1321            let should_skip = match &task.task_type {
1322                SyncTaskType::DailyHealth { date } => context
1323                    .parquet
1324                    .has_daily_health(context.profile_id, *date)
1325                    .unwrap_or(false),
1326                SyncTaskType::Performance { date } => context
1327                    .parquet
1328                    .has_performance_metrics(context.profile_id, *date)
1329                    .unwrap_or(false),
1330                SyncTaskType::DownloadGpx {
1331                    activity_id,
1332                    activity_date,
1333                    ..
1334                } => activity_date
1335                    .as_ref()
1336                    .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1337                    .and_then(|date| context.parquet.has_track_points(*activity_id, date).ok())
1338                    .unwrap_or(false),
1339                _ => false,
1340            };
1341
1342            if should_skip {
1343                if let Err(e) = queue.mark_completed(task_id).await {
1344                    eprintln!("Failed to mark task completed: {}", e);
1345                }
1346                complete_progress_for_task(&task, &context.progress);
1347                {
1348                    let mut s = context.stats.lock().await;
1349                    s.completed += 1;
1350                }
1351                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1352                continue;
1353            }
1354        }
1355
1356        // Acquire rate limiter permit
1357        let _permit = context.rate_limiter.acquire().await;
1358        context.progress.record_request();
1359
1360        // Fetch data based on task type
1361        let result = fetch_task_data(
1362            &task,
1363            &context.client,
1364            &context.token,
1365            &context.display_name,
1366            context.profile_id,
1367        )
1368        .await;
1369
1370        match result {
1371            Ok(data) => {
1372                context.rate_limiter.on_success();
1373                // Send to consumer
1374                if tx.send(data).await.is_err() {
1375                    // Channel closed, consumer is done
1376                    context.in_flight.fetch_sub(1, Ordering::Relaxed);
1377                    let backoff = Duration::seconds(60);
1378                    let _ = queue
1379                        .mark_failed(task_id, "Consumer channel closed", backoff)
1380                        .await;
1381                    break;
1382                }
1383            }
1384            Err(GarminError::RateLimited) => {
1385                context.rate_limiter.on_rate_limit();
1386                let backoff = Duration::seconds(60);
1387                if let Err(e) = queue.mark_failed(task_id, "Rate limited", backoff).await {
1388                    eprintln!("Failed to mark task as rate limited: {}", e);
1389                }
1390                fail_progress_for_task(&task, &context.progress, "Rate limited");
1391                {
1392                    let mut s = context.stats.lock().await;
1393                    s.rate_limited += 1;
1394                }
1395                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1396            }
1397            Err(e) => {
1398                let backoff = Duration::seconds(60);
1399                let error_msg = e.to_string();
1400                if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
1401                    eprintln!("Failed to mark task as failed: {}", e);
1402                }
1403                fail_progress_for_task(&task, &context.progress, &error_msg);
1404                {
1405                    let mut s = context.stats.lock().await;
1406                    s.failed += 1;
1407                }
1408                context.in_flight.fetch_sub(1, Ordering::Relaxed);
1409            }
1410        }
1411    }
1412}
1413
1414fn value_to_i32(value: &serde_json::Value) -> Option<i32> {
1415    if let Some(int) = value.as_i64() {
1416        return Some(int as i32);
1417    }
1418    value.as_f64().map(|float| float.round() as i32)
1419}
1420
1421fn first_entry(value: &serde_json::Value) -> Option<&serde_json::Value> {
1422    if let Some(array) = value.as_array() {
1423        array.first()
1424    } else {
1425        Some(value)
1426    }
1427}
1428
1429fn parse_sleep_metrics(value: Option<&serde_json::Value>) -> SleepMetrics {
1430    let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value);
1431
1432    let dto = match dto {
1433        Some(dto) => dto,
1434        None => return (None, None, None, None, None),
1435    };
1436
1437    let deep = dto.get("deepSleepSeconds").and_then(value_to_i32);
1438    let light = dto.get("lightSleepSeconds").and_then(value_to_i32);
1439    let rem = dto.get("remSleepSeconds").and_then(value_to_i32);
1440
1441    let total = dto
1442        .get("sleepTimeSeconds")
1443        .and_then(value_to_i32)
1444        .or_else(|| match (deep, light, rem) {
1445            (Some(d), Some(l), Some(r)) => Some(d + l + r),
1446            _ => None,
1447        });
1448
1449    let score = dto
1450        .get("sleepScores")
1451        .and_then(|v| v.get("overall"))
1452        .and_then(|v| v.get("value"))
1453        .and_then(value_to_i32);
1454
1455    (total, deep, light, rem, score)
1456}
1457
1458/// Extract the subjective sleep note (dailySleepDTO.userNote) added in the
1459/// Garmin mobile app. Returns None when absent or empty after trimming.
1460fn parse_sleep_note(value: Option<&serde_json::Value>) -> Option<String> {
1461    let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value)?;
1462    dto.get("userNote")
1463        .and_then(|v| v.as_str())
1464        .map(|s| s.trim())
1465        .filter(|s| !s.is_empty())
1466        .map(|s| s.to_string())
1467}
1468
1469fn parse_hrv_metrics(
1470    value: Option<&serde_json::Value>,
1471) -> (Option<i32>, Option<i32>, Option<String>) {
1472    let summary = value.and_then(|v| v.get("hrvSummary")).or(value);
1473
1474    let summary = match summary {
1475        Some(summary) => summary,
1476        None => return (None, None, None),
1477    };
1478
1479    let weekly_avg = summary.get("weeklyAvg").and_then(value_to_i32);
1480    let last_night = summary
1481        .get("lastNight")
1482        .or_else(|| summary.get("lastNightAvg"))
1483        .and_then(value_to_i32);
1484    let status = summary
1485        .get("status")
1486        .and_then(|v| v.as_str())
1487        .map(|s| s.to_string());
1488
1489    (weekly_avg, last_night, status)
1490}
1491
1492fn parse_vo2max_value(value: Option<&serde_json::Value>) -> Option<f64> {
1493    let entry = value.and_then(first_entry)?;
1494    entry
1495        .get("generic")
1496        .and_then(|v| v.get("vo2MaxValue"))
1497        .and_then(|v| v.as_f64())
1498}
1499
1500fn parse_vo2max_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1501    let entry = value.and_then(first_entry)?;
1502    entry
1503        .get("generic")
1504        .and_then(|v| v.get("fitnessAge"))
1505        .and_then(value_to_i32)
1506}
1507
1508fn parse_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1509    value
1510        .and_then(|v| v.get("fitnessAge"))
1511        .and_then(value_to_i32)
1512}
1513
1514fn parse_race_predictions(
1515    value: Option<&serde_json::Value>,
1516) -> (Option<i32>, Option<i32>, Option<i32>, Option<i32>) {
1517    let entry = match value.and_then(first_entry) {
1518        Some(entry) => entry,
1519        None => return (None, None, None, None),
1520    };
1521
1522    let race_5k = entry.get("time5K").and_then(value_to_i32);
1523    let race_10k = entry.get("time10K").and_then(value_to_i32);
1524    let race_half = entry.get("timeHalfMarathon").and_then(value_to_i32);
1525    let race_marathon = entry.get("timeMarathon").and_then(value_to_i32);
1526
1527    (race_5k, race_10k, race_half, race_marathon)
1528}
1529
1530fn parse_overall_score(value: Option<&serde_json::Value>) -> Option<i32> {
1531    let entry = value.and_then(first_entry)?;
1532    entry.get("overallScore").and_then(value_to_i32)
1533}
1534
1535fn parse_training_status(value: Option<&serde_json::Value>, date: NaiveDate) -> Option<String> {
1536    let root = value?;
1537    let date_str = date.to_string();
1538
1539    if let Some(latest) = root
1540        .get("mostRecentTrainingStatus")
1541        .and_then(|v| v.get("latestTrainingStatusData"))
1542        .and_then(|v| v.as_object())
1543    {
1544        for entry in latest.values() {
1545            if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1546                return entry
1547                    .get("trainingStatusFeedbackPhrase")
1548                    .and_then(|v| v.as_str())
1549                    .map(|s| s.to_string());
1550            }
1551        }
1552    }
1553
1554    if let Some(history) = root.get("trainingStatusHistory").and_then(|v| v.as_array()) {
1555        for entry in history {
1556            if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1557                return entry
1558                    .get("trainingStatusFeedbackPhrase")
1559                    .and_then(|v| v.as_str())
1560                    .map(|s| s.to_string());
1561            }
1562        }
1563    }
1564
1565    None
1566}
1567
1568/// Fetch data for a task from the Garmin API
1569async fn fetch_task_data(
1570    task: &SyncTask,
1571    client: &GarminClient,
1572    token: &OAuth2Token,
1573    display_name: &str,
1574    profile_id: i32,
1575) -> Result<SyncData> {
1576    let task_id = task.id.unwrap();
1577
1578    match &task.task_type {
1579        SyncTaskType::Activities {
1580            start,
1581            limit,
1582            min_date,
1583            max_date,
1584        } => {
1585            let path = format!(
1586                "/activitylist-service/activities/search/activities?limit={}&start={}",
1587                limit, start
1588            );
1589            let activities: Vec<serde_json::Value> = client.get_json(token, &path).await?;
1590
1591            let mut records = Vec::new();
1592            let mut gpx_tasks = Vec::new();
1593            let mut reached_min = false;
1594
1595            for activity in &activities {
1596                let activity_date = activity
1597                    .get("startTimeLocal")
1598                    .and_then(|v| v.as_str())
1599                    .and_then(|s| s.split(' ').next())
1600                    .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
1601
1602                if let Some(date) = activity_date {
1603                    if let Some(min) = *min_date {
1604                        if date < min {
1605                            reached_min = true;
1606                            break;
1607                        }
1608                    }
1609
1610                    if let Some(max) = *max_date {
1611                        if date > max {
1612                            continue;
1613                        }
1614                    }
1615                }
1616
1617                // Parse activity
1618                let parsed = parse_activity(activity, profile_id)?;
1619                records.push(parsed);
1620
1621                // Queue GPX download for activities with GPS
1622                if activity
1623                    .get("hasPolyline")
1624                    .and_then(|v| v.as_bool())
1625                    .unwrap_or(false)
1626                {
1627                    if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
1628                        let activity_name = activity
1629                            .get("activityName")
1630                            .and_then(|v| v.as_str())
1631                            .map(|s| s.to_string());
1632                        let activity_date = activity
1633                            .get("startTimeLocal")
1634                            .and_then(|v| v.as_str())
1635                            .and_then(|s| s.split(' ').next())
1636                            .map(|s| s.to_string());
1637
1638                        gpx_tasks.push(SyncTask::new(
1639                            profile_id,
1640                            task.pipeline,
1641                            SyncTaskType::DownloadGpx {
1642                                activity_id: id,
1643                                activity_name,
1644                                activity_date,
1645                            },
1646                        ));
1647                    }
1648                }
1649            }
1650
1651            // Check if there's a next page
1652            let next_page = if activities.len() == *limit as usize && !reached_min {
1653                Some(SyncTask::new(
1654                    profile_id,
1655                    task.pipeline,
1656                    SyncTaskType::Activities {
1657                        start: start + limit,
1658                        limit: *limit,
1659                        min_date: *min_date,
1660                        max_date: *max_date,
1661                    },
1662                ))
1663            } else {
1664                None
1665            };
1666
1667            Ok(SyncData::Activities {
1668                records,
1669                gpx_tasks,
1670                next_page,
1671                task_id,
1672            })
1673        }
1674
1675        SyncTaskType::DownloadGpx {
1676            activity_id,
1677            activity_date,
1678            ..
1679        } => {
1680            let path = format!("/download-service/export/gpx/activity/{}", activity_id);
1681            let gpx_bytes = client.download(token, &path).await?;
1682            let gpx_data = String::from_utf8_lossy(&gpx_bytes);
1683
1684            // Parse activity date for partitioning
1685            let date = activity_date
1686                .as_ref()
1687                .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1688                .unwrap_or_else(|| Utc::now().date_naive());
1689
1690            let points = parse_gpx(*activity_id, &gpx_data)?;
1691
1692            Ok(SyncData::TrackPoints {
1693                activity_id: *activity_id,
1694                date,
1695                points,
1696                task_id,
1697            })
1698        }
1699
1700        SyncTaskType::DailyHealth { date } => {
1701            let path = format!(
1702                "/usersummary-service/usersummary/daily/{}?calendarDate={}",
1703                display_name, date
1704            );
1705
1706            // Try to fetch health data - may return 404/error for dates without data
1707            let health_result: std::result::Result<serde_json::Value, _> =
1708                client.get_json(token, &path).await;
1709
1710            let health = match health_result {
1711                Ok(data) => data,
1712                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
1713                    // No data for this date - store empty record to mark as synced
1714                    serde_json::json!({})
1715                }
1716                Err(e) => return Err(e),
1717            };
1718
1719            let sleep_path = format!(
1720                "/wellness-service/wellness/dailySleepData/{}?date={}",
1721                display_name, date
1722            );
1723            let sleep_data: Option<serde_json::Value> =
1724                match client.get_json(token, &sleep_path).await {
1725                    Ok(data) => Some(data),
1726                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1727                    Err(e) => return Err(e),
1728                };
1729
1730            let hrv_path = format!("/hrv-service/hrv/{}", date);
1731            let hrv_data: Option<serde_json::Value> = match client.get_json(token, &hrv_path).await
1732            {
1733                Ok(data) => Some(data),
1734                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1735                Err(e) => return Err(e),
1736            };
1737
1738            let (sleep_total, deep_sleep, light_sleep, rem_sleep, sleep_score) =
1739                parse_sleep_metrics(sleep_data.as_ref());
1740            let sleep_note = parse_sleep_note(sleep_data.as_ref());
1741            let (hrv_weekly_avg, hrv_last_night, hrv_status) = parse_hrv_metrics(hrv_data.as_ref());
1742
1743            let record = DailyHealth {
1744                id: None,
1745                profile_id,
1746                date: *date,
1747                steps: health
1748                    .get("totalSteps")
1749                    .and_then(|v| v.as_i64())
1750                    .map(|v| v as i32),
1751                step_goal: health
1752                    .get("dailyStepGoal")
1753                    .and_then(|v| v.as_i64())
1754                    .map(|v| v as i32),
1755                total_calories: health
1756                    .get("totalKilocalories")
1757                    .and_then(|v| v.as_i64())
1758                    .map(|v| v as i32),
1759                active_calories: health
1760                    .get("activeKilocalories")
1761                    .and_then(|v| v.as_i64())
1762                    .map(|v| v as i32),
1763                bmr_calories: health
1764                    .get("bmrKilocalories")
1765                    .and_then(|v| v.as_i64())
1766                    .map(|v| v as i32),
1767                resting_hr: health
1768                    .get("restingHeartRate")
1769                    .and_then(|v| v.as_i64())
1770                    .map(|v| v as i32),
1771                sleep_seconds: sleep_total.or_else(|| {
1772                    health
1773                        .get("sleepingSeconds")
1774                        .and_then(|v| v.as_i64())
1775                        .map(|v| v as i32)
1776                }),
1777                deep_sleep_seconds: deep_sleep,
1778                light_sleep_seconds: light_sleep,
1779                rem_sleep_seconds: rem_sleep,
1780                sleep_score,
1781                sleep_note,
1782                avg_stress: health
1783                    .get("averageStressLevel")
1784                    .and_then(|v| v.as_i64())
1785                    .map(|v| v as i32),
1786                max_stress: health
1787                    .get("maxStressLevel")
1788                    .and_then(|v| v.as_i64())
1789                    .map(|v| v as i32),
1790                body_battery_start: health
1791                    .get("bodyBatteryChargedValue")
1792                    .and_then(|v| v.as_i64())
1793                    .map(|v| v as i32),
1794                body_battery_end: health
1795                    .get("bodyBatteryDrainedValue")
1796                    .and_then(|v| v.as_i64())
1797                    .map(|v| v as i32),
1798                hrv_weekly_avg,
1799                hrv_last_night,
1800                hrv_status,
1801                avg_respiration: health
1802                    .get("averageRespirationValue")
1803                    .and_then(|v| v.as_f64()),
1804                avg_spo2: health
1805                    .get("averageSpo2Value")
1806                    .and_then(|v| v.as_i64())
1807                    .map(|v| v as i32),
1808                lowest_spo2: health
1809                    .get("lowestSpo2Value")
1810                    .and_then(|v| v.as_i64())
1811                    .map(|v| v as i32),
1812                hydration_ml: health
1813                    .get("hydrationIntakeGoal")
1814                    .and_then(|v| v.as_i64())
1815                    .map(|v| v as i32),
1816                moderate_intensity_min: health
1817                    .get("moderateIntensityMinutes")
1818                    .and_then(|v| v.as_i64())
1819                    .map(|v| v as i32),
1820                vigorous_intensity_min: health
1821                    .get("vigorousIntensityMinutes")
1822                    .and_then(|v| v.as_i64())
1823                    .map(|v| v as i32),
1824                raw_json: Some(health),
1825            };
1826
1827            Ok(SyncData::Health { record, task_id })
1828        }
1829
1830        SyncTaskType::Performance { date } => {
1831            let vo2_path = format!("/metrics-service/metrics/maxmet/daily/{}/{}", date, date);
1832            let vo2max: Option<serde_json::Value> = match client.get_json(token, &vo2_path).await {
1833                Ok(data) => Some(data),
1834                Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1835                Err(e) => return Err(e),
1836            };
1837
1838            let race_path = format!(
1839                "/metrics-service/metrics/racepredictions/daily/{}?fromCalendarDate={}&toCalendarDate={}",
1840                display_name, date, date
1841            );
1842            let race_predictions: Option<serde_json::Value> =
1843                match client.get_json(token, &race_path).await {
1844                    Ok(data) => Some(data),
1845                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1846                    Err(e) => return Err(e),
1847                };
1848
1849            // Fetch training readiness
1850            let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
1851            let training_readiness: Option<serde_json::Value> =
1852                client.get_json(token, &readiness_path).await.ok();
1853
1854            // Fetch training status
1855            let status_path = format!(
1856                "/metrics-service/metrics/trainingstatus/aggregated/{}",
1857                date
1858            );
1859            let training_status: Option<serde_json::Value> =
1860                client.get_json(token, &status_path).await.ok();
1861
1862            let endurance_path = format!(
1863                "/metrics-service/metrics/endurancescore?calendarDate={}",
1864                date
1865            );
1866            let endurance_score_data: Option<serde_json::Value> =
1867                match client.get_json(token, &endurance_path).await {
1868                    Ok(data) => Some(data),
1869                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1870                    Err(e) => return Err(e),
1871                };
1872
1873            let hill_path = format!("/metrics-service/metrics/hillscore?calendarDate={}", date);
1874            let hill_score_data: Option<serde_json::Value> =
1875                match client.get_json(token, &hill_path).await {
1876                    Ok(data) => Some(data),
1877                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1878                    Err(e) => return Err(e),
1879                };
1880
1881            let fitness_age_path = format!("/fitnessage-service/fitnessage/{}", date);
1882            let fitness_age_data: Option<serde_json::Value> =
1883                match client.get_json(token, &fitness_age_path).await {
1884                    Ok(data) => Some(data),
1885                    Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1886                    Err(e) => return Err(e),
1887                };
1888
1889            let vo2max_value = parse_vo2max_value(vo2max.as_ref());
1890            let fitness_age = parse_fitness_age(fitness_age_data.as_ref())
1891                .or_else(|| parse_vo2max_fitness_age(vo2max.as_ref()));
1892
1893            let readiness_entry = training_readiness
1894                .as_ref()
1895                .and_then(|v| v.as_array())
1896                .and_then(|arr| arr.first());
1897
1898            let readiness_score = readiness_entry
1899                .and_then(|e| e.get("score"))
1900                .and_then(|v| v.as_i64())
1901                .map(|v| v as i32);
1902
1903            let training_status_str = parse_training_status(training_status.as_ref(), *date);
1904            let (race_5k, race_10k, race_half, race_marathon) =
1905                parse_race_predictions(race_predictions.as_ref());
1906            let endurance_score = parse_overall_score(endurance_score_data.as_ref());
1907            let hill_score = parse_overall_score(hill_score_data.as_ref());
1908
1909            let record = PerformanceMetrics {
1910                id: None,
1911                profile_id,
1912                date: *date,
1913                vo2max: vo2max_value,
1914                fitness_age,
1915                training_readiness: readiness_score,
1916                training_status: training_status_str,
1917                lactate_threshold_hr: None,
1918                lactate_threshold_pace: None,
1919                race_5k_sec: race_5k,
1920                race_10k_sec: race_10k,
1921                race_half_sec: race_half,
1922                race_marathon_sec: race_marathon,
1923                endurance_score,
1924                hill_score,
1925                raw_json: None,
1926            };
1927
1928            Ok(SyncData::Performance { record, task_id })
1929        }
1930
1931        _ => {
1932            // Other task types not implemented for parallel yet
1933            Err(GarminError::invalid_response(
1934                "Unsupported task type for parallel sync",
1935            ))
1936        }
1937    }
1938}
1939
1940/// Parse activity JSON into Activity struct (standalone version for producer)
1941fn parse_activity(activity: &serde_json::Value, profile_id: i32) -> Result<Activity> {
1942    let activity_id = activity
1943        .get("activityId")
1944        .and_then(|v| v.as_i64())
1945        .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
1946
1947    let start_time_local = activity
1948        .get("startTimeLocal")
1949        .and_then(|v| v.as_str())
1950        .and_then(parse_garmin_datetime);
1951
1952    let start_time_gmt = activity
1953        .get("startTimeGMT")
1954        .and_then(|v| v.as_str())
1955        .and_then(parse_garmin_datetime);
1956
1957    Ok(Activity {
1958        activity_id,
1959        profile_id,
1960        activity_name: activity
1961            .get("activityName")
1962            .and_then(|v| v.as_str())
1963            .map(|s| s.to_string()),
1964        activity_type: activity
1965            .get("activityType")
1966            .and_then(|v| v.get("typeKey"))
1967            .and_then(|v| v.as_str())
1968            .map(|s| s.to_string()),
1969        start_time_local,
1970        start_time_gmt,
1971        duration_sec: activity.get("duration").and_then(|v| v.as_f64()),
1972        distance_m: activity.get("distance").and_then(|v| v.as_f64()),
1973        calories: activity
1974            .get("calories")
1975            .and_then(|v| v.as_f64())
1976            .map(|v| v as i32),
1977        avg_hr: activity
1978            .get("averageHR")
1979            .and_then(|v| v.as_f64())
1980            .map(|v| v as i32),
1981        max_hr: activity
1982            .get("maxHR")
1983            .and_then(|v| v.as_f64())
1984            .map(|v| v as i32),
1985        avg_speed: activity.get("averageSpeed").and_then(|v| v.as_f64()),
1986        max_speed: activity.get("maxSpeed").and_then(|v| v.as_f64()),
1987        elevation_gain: activity.get("elevationGain").and_then(|v| v.as_f64()),
1988        elevation_loss: activity.get("elevationLoss").and_then(|v| v.as_f64()),
1989        avg_cadence: activity
1990            .get("averageRunningCadenceInStepsPerMinute")
1991            .and_then(|v| v.as_f64()),
1992        avg_power: activity
1993            .get("avgPower")
1994            .and_then(|v| v.as_i64())
1995            .map(|v| v as i32),
1996        normalized_power: activity
1997            .get("normPower")
1998            .and_then(|v| v.as_i64())
1999            .map(|v| v as i32),
2000        training_effect: activity
2001            .get("aerobicTrainingEffect")
2002            .and_then(|v| v.as_f64()),
2003        training_load: activity
2004            .get("activityTrainingLoad")
2005            .and_then(|v| v.as_f64()),
2006        start_lat: activity.get("startLatitude").and_then(|v| v.as_f64()),
2007        start_lon: activity.get("startLongitude").and_then(|v| v.as_f64()),
2008        end_lat: activity.get("endLatitude").and_then(|v| v.as_f64()),
2009        end_lon: activity.get("endLongitude").and_then(|v| v.as_f64()),
2010        ground_contact_time: activity
2011            .get("avgGroundContactTime")
2012            .and_then(|v| v.as_f64()),
2013        vertical_oscillation: activity
2014            .get("avgVerticalOscillation")
2015            .and_then(|v| v.as_f64()),
2016        stride_length: activity.get("avgStrideLength").and_then(|v| v.as_f64()),
2017        location_name: activity
2018            .get("locationName")
2019            .and_then(|v| v.as_str())
2020            .map(|s| s.to_string()),
2021        raw_json: Some(activity.clone()),
2022    })
2023}
2024
2025fn parse_garmin_datetime(value: &str) -> Option<DateTime<Utc>> {
2026    if let Ok(dt) = DateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
2027        return Some(dt.with_timezone(&Utc));
2028    }
2029
2030    let naive = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
2031        .or_else(|_| NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f"))
2032        .ok()?;
2033
2034    Some(DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2035}
2036
2037/// Parse GPX data and return track points
2038fn parse_gpx(activity_id: i64, gpx_data: &str) -> Result<Vec<TrackPoint>> {
2039    use gpx::read;
2040    use std::io::BufReader;
2041
2042    let gpx_data = gpx_data.trim_start();
2043    let reader = BufReader::new(gpx_data.as_bytes());
2044    let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
2045    let mut streams = parse_gpx_track_point_streams(gpx_data)?.into_iter();
2046
2047    let mut points = Vec::new();
2048
2049    for track in gpx.tracks {
2050        for segment in track.segments {
2051            for point in segment.points {
2052                let stream = streams.next().unwrap_or_default();
2053                let timestamp = point
2054                    .time
2055                    .map(|t| {
2056                        DateTime::parse_from_rfc3339(&t.format().unwrap_or_default())
2057                            .map(|dt| dt.with_timezone(&Utc))
2058                            .unwrap_or_default()
2059                    })
2060                    .unwrap_or_default();
2061
2062                points.push(TrackPoint {
2063                    id: None,
2064                    activity_id,
2065                    timestamp,
2066                    lat: Some(point.point().y()),
2067                    lon: Some(point.point().x()),
2068                    elevation: point.elevation,
2069                    heart_rate: stream.heart_rate,
2070                    cadence: stream.cadence,
2071                    power: stream.power,
2072                    speed: point.speed.or(stream.speed),
2073                });
2074            }
2075        }
2076    }
2077
2078    Ok(points)
2079}
2080
2081fn parse_gpx_track_point_streams(gpx_data: &str) -> Result<Vec<TrackPointStreams>> {
2082    use xml::reader::{EventReader, XmlEvent};
2083
2084    let parser = EventReader::from_str(gpx_data);
2085    let mut streams = Vec::new();
2086    let mut current_point: Option<TrackPointStreams> = None;
2087    let mut current_field: Option<TrackPointStreamField> = None;
2088    let mut current_text = String::new();
2089
2090    for event in parser {
2091        match event.map_err(|e| GarminError::invalid_response(e.to_string()))? {
2092            XmlEvent::StartElement { name, .. } => {
2093                if name.local_name == "trkpt" {
2094                    current_point = Some(TrackPointStreams::default());
2095                    current_field = None;
2096                    current_text.clear();
2097                } else if current_point.is_some() {
2098                    if let Some(field) = track_point_stream_field(&name.local_name) {
2099                        current_field = Some(field);
2100                        current_text.clear();
2101                    }
2102                }
2103            }
2104            XmlEvent::Characters(text) | XmlEvent::CData(text) if current_field.is_some() => {
2105                current_text.push_str(&text);
2106            }
2107            XmlEvent::EndElement { name } => {
2108                if name.local_name == "trkpt" {
2109                    streams.push(current_point.take().unwrap_or_default());
2110                    current_field = None;
2111                    current_text.clear();
2112                } else if let (Some(field), Some(point)) = (current_field, current_point.as_mut()) {
2113                    if track_point_stream_field(&name.local_name) == Some(field) {
2114                        apply_track_point_stream_field(point, field, &current_text);
2115                        current_field = None;
2116                        current_text.clear();
2117                    }
2118                }
2119            }
2120            _ => {}
2121        }
2122    }
2123
2124    Ok(streams)
2125}
2126
2127fn track_point_stream_field(local_name: &str) -> Option<TrackPointStreamField> {
2128    match local_name.to_ascii_lowercase().as_str() {
2129        "hr" | "heartrate" | "heart_rate" | "heartratebpm" => {
2130            Some(TrackPointStreamField::HeartRate)
2131        }
2132        "cad" | "cadence" => Some(TrackPointStreamField::Cadence),
2133        "power" | "powerinwatts" | "watts" => Some(TrackPointStreamField::Power),
2134        "speed" => Some(TrackPointStreamField::Speed),
2135        _ => None,
2136    }
2137}
2138
2139fn apply_track_point_stream_field(
2140    point: &mut TrackPointStreams,
2141    field: TrackPointStreamField,
2142    value: &str,
2143) {
2144    match field {
2145        TrackPointStreamField::HeartRate => point.heart_rate = parse_i32_stream_value(value),
2146        TrackPointStreamField::Cadence => point.cadence = parse_i32_stream_value(value),
2147        TrackPointStreamField::Power => point.power = parse_i32_stream_value(value),
2148        TrackPointStreamField::Speed => point.speed = parse_f64_stream_value(value),
2149    }
2150}
2151
2152fn parse_i32_stream_value(value: &str) -> Option<i32> {
2153    let value = value.trim().parse::<f64>().ok()?;
2154    if value.is_finite() && value >= i32::MIN as f64 && value <= i32::MAX as f64 {
2155        Some(value.round() as i32)
2156    } else {
2157        None
2158    }
2159}
2160
2161fn parse_f64_stream_value(value: &str) -> Option<f64> {
2162    let value = value.trim().parse::<f64>().ok()?;
2163    value.is_finite().then_some(value)
2164}
2165
2166/// Consumer loop: receives data from channel and writes to Parquet
2167async fn consumer_loop(
2168    _id: usize,
2169    rx: Arc<TokioMutex<mpsc::Receiver<SyncData>>>,
2170    parquet: Arc<ParquetStore>,
2171    queue: SharedTaskQueue,
2172    stats: Arc<TokioMutex<SyncStats>>,
2173    progress: SharedProgress,
2174    in_flight: Arc<AtomicUsize>,
2175) {
2176    loop {
2177        // Receive next data item
2178        let data = {
2179            let mut rx = rx.lock().await;
2180            rx.recv().await
2181        };
2182
2183        let data = match data {
2184            Some(d) => d,
2185            None => break, // Channel closed, all producers done
2186        };
2187
2188        // Process and write data
2189        let result = match &data {
2190            SyncData::Activities {
2191                records,
2192                gpx_tasks,
2193                next_page,
2194                task_id,
2195            } => {
2196                // Write activities to Parquet
2197                let write_result = parquet.upsert_activities_async(records).await;
2198
2199                if write_result.is_ok() {
2200                    // Queue GPX tasks and update progress totals
2201                    let mut gpx_added = 0u32;
2202                    for gpx_task in gpx_tasks {
2203                        let should_skip = match &gpx_task.task_type {
2204                            SyncTaskType::DownloadGpx {
2205                                activity_id,
2206                                activity_date,
2207                                ..
2208                            } => activity_date
2209                                .as_ref()
2210                                .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
2211                                .and_then(|date| parquet.has_track_points(*activity_id, date).ok())
2212                                .unwrap_or(false),
2213                            _ => false,
2214                        };
2215
2216                        if should_skip {
2217                            continue;
2218                        }
2219
2220                        if let Err(e) = queue.push(gpx_task.clone()).await {
2221                            eprintln!("Failed to queue GPX task: {}", e);
2222                        } else {
2223                            gpx_added += 1;
2224                        }
2225                    }
2226                    if gpx_added > 0 {
2227                        progress.gpx.add_total(gpx_added);
2228                    }
2229
2230                    // Queue next page if there is one
2231                    if let Some(next) = next_page {
2232                        if let Err(e) = queue.push(next.clone()).await {
2233                            eprintln!("Failed to queue next page: {}", e);
2234                        }
2235                        progress.activities.add_total(1);
2236                    }
2237                }
2238
2239                (write_result, *task_id, "Activities")
2240            }
2241
2242            SyncData::Health { record, task_id } => {
2243                let result = parquet
2244                    .upsert_daily_health_async(std::slice::from_ref(record))
2245                    .await;
2246                (result, *task_id, "Health")
2247            }
2248
2249            SyncData::Performance { record, task_id } => {
2250                let result = parquet
2251                    .upsert_performance_metrics_async(std::slice::from_ref(record))
2252                    .await;
2253                (result, *task_id, "Performance")
2254            }
2255
2256            SyncData::TrackPoints {
2257                date,
2258                points,
2259                task_id,
2260                ..
2261            } => {
2262                let result = parquet.write_track_points_async(*date, points).await;
2263                (result, *task_id, "GPX")
2264            }
2265        };
2266
2267        let (write_result, task_id, task_type) = result;
2268
2269        match write_result {
2270            Ok(()) => {
2271                // Mark task completed
2272                if let Err(e) = queue.mark_completed(task_id).await {
2273                    eprintln!("Failed to mark task completed: {}", e);
2274                }
2275
2276                // Update stats
2277                {
2278                    let mut s = stats.lock().await;
2279                    s.completed += 1;
2280                }
2281
2282                // Update progress based on task type
2283                match task_type {
2284                    "Activities" => {
2285                        progress.activities.complete_one();
2286                        progress.activities.clear_current_item();
2287                    }
2288                    "Health" => {
2289                        progress.health.complete_one();
2290                        progress.health.clear_current_item();
2291                    }
2292                    "Performance" => {
2293                        progress.performance.complete_one();
2294                        progress.performance.clear_current_item();
2295                    }
2296                    "GPX" => {
2297                        progress.gpx.complete_one();
2298                        progress.gpx.clear_current_item();
2299                    }
2300                    _ => {}
2301                }
2302                in_flight.fetch_sub(1, Ordering::Relaxed);
2303            }
2304            Err(e) => {
2305                // Mark task failed
2306                let backoff = Duration::seconds(60);
2307                let error_msg = e.to_string();
2308                if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
2309                    eprintln!("Failed to mark task as failed: {}", e);
2310                }
2311
2312                // Update stats
2313                {
2314                    let mut s = stats.lock().await;
2315                    s.failed += 1;
2316                }
2317
2318                record_write_failure(&data, &progress, &error_msg);
2319                eprintln!("Write error for {}: {}", task_type, error_msg);
2320                in_flight.fetch_sub(1, Ordering::Relaxed);
2321            }
2322        }
2323    }
2324}
2325
2326#[cfg(test)]
2327mod tests {
2328    use super::*;
2329    use crate::db::models::TaskStatus;
2330    use chrono::NaiveDate;
2331    use serde_json::json;
2332    use wiremock::matchers::{method, path, query_param};
2333    use wiremock::{Mock, MockServer, ResponseTemplate};
2334
2335    #[test]
2336    fn test_should_exit_when_idle_requires_no_inflight() {
2337        assert!(!should_exit_when_idle(MAX_IDLE_RETRIES, 1));
2338        assert!(should_exit_when_idle(MAX_IDLE_RETRIES, 0));
2339        assert!(!should_exit_when_idle(MAX_IDLE_RETRIES - 1, 0));
2340    }
2341
2342    #[test]
2343    fn test_record_write_failure_updates_progress() {
2344        let progress = SyncProgress::new();
2345        let record = DailyHealth {
2346            id: None,
2347            profile_id: 1,
2348            date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
2349            steps: None,
2350            step_goal: None,
2351            total_calories: None,
2352            active_calories: None,
2353            bmr_calories: None,
2354            resting_hr: None,
2355            sleep_seconds: None,
2356            deep_sleep_seconds: None,
2357            light_sleep_seconds: None,
2358            rem_sleep_seconds: None,
2359            sleep_score: None,
2360            sleep_note: None,
2361            avg_stress: None,
2362            max_stress: None,
2363            body_battery_start: None,
2364            body_battery_end: None,
2365            hrv_weekly_avg: None,
2366            hrv_last_night: None,
2367            hrv_status: None,
2368            avg_respiration: None,
2369            avg_spo2: None,
2370            lowest_spo2: None,
2371            hydration_ml: None,
2372            moderate_intensity_min: None,
2373            vigorous_intensity_min: None,
2374            raw_json: None,
2375        };
2376
2377        let data = SyncData::Health { record, task_id: 1 };
2378        record_write_failure(&data, &progress, "write failed");
2379
2380        assert_eq!(progress.health.get_failed(), 1);
2381        let errors = progress.get_errors();
2382        assert_eq!(errors.len(), 1);
2383        assert_eq!(errors[0].stream, "Health");
2384    }
2385
2386    fn test_token() -> OAuth2Token {
2387        OAuth2Token {
2388            scope: "test".to_string(),
2389            jti: "jti".to_string(),
2390            token_type: "Bearer".to_string(),
2391            access_token: "access".to_string(),
2392            refresh_token: "refresh".to_string(),
2393            expires_in: 3600,
2394            expires_at: Utc::now().timestamp() + 3600,
2395            refresh_token_expires_in: 86400,
2396            refresh_token_expires_at: Utc::now().timestamp() + 86400,
2397        }
2398    }
2399
2400    #[tokio::test]
2401    async fn test_activity_pagination_respects_date_bounds() {
2402        let server = MockServer::start().await;
2403
2404        let body = serde_json::json!([
2405            {
2406                "activityId": 1,
2407                "activityName": "Newest",
2408                "startTimeLocal": "2025-01-05 08:00:00",
2409                "startTimeGMT": "2025-01-05 07:00:00",
2410                "activityType": { "typeKey": "running" },
2411                "hasPolyline": false
2412            },
2413            {
2414                "activityId": 2,
2415                "activityName": "Mid",
2416                "startTimeLocal": "2025-01-04 08:00:00",
2417                "startTimeGMT": "2025-01-04 07:00:00",
2418                "activityType": { "typeKey": "running" },
2419                "hasPolyline": false
2420            },
2421            {
2422                "activityId": 3,
2423                "activityName": "Old",
2424                "startTimeLocal": "2025-01-03 08:00:00",
2425                "startTimeGMT": "2025-01-03 07:00:00",
2426                "activityType": { "typeKey": "running" },
2427                "hasPolyline": false
2428            }
2429        ]);
2430
2431        Mock::given(method("GET"))
2432            .and(path("/activitylist-service/activities/search/activities"))
2433            .and(query_param("limit", "50"))
2434            .and(query_param("start", "0"))
2435            .respond_with(ResponseTemplate::new(200).set_body_json(body))
2436            .mount(&server)
2437            .await;
2438
2439        let client = GarminClient::new_with_base_url(&server.uri());
2440        let task = SyncTask {
2441            id: Some(1),
2442            profile_id: 1,
2443            task_type: SyncTaskType::Activities {
2444                start: 0,
2445                limit: 50,
2446                min_date: Some(NaiveDate::from_ymd_opt(2025, 1, 4).unwrap()),
2447                max_date: Some(NaiveDate::from_ymd_opt(2025, 1, 5).unwrap()),
2448            },
2449            pipeline: SyncPipeline::Frontier,
2450            status: TaskStatus::Pending,
2451            attempts: 0,
2452            last_error: None,
2453            created_at: None,
2454            next_retry_at: None,
2455            completed_at: None,
2456        };
2457
2458        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2459            .await
2460            .unwrap();
2461
2462        match data {
2463            SyncData::Activities {
2464                records,
2465                gpx_tasks,
2466                next_page,
2467                ..
2468            } => {
2469                assert_eq!(records.len(), 2);
2470                assert!(gpx_tasks.is_empty());
2471                assert!(next_page.is_none());
2472            }
2473            _ => panic!("unexpected data type"),
2474        }
2475    }
2476
2477    #[tokio::test]
2478    async fn test_daily_health_includes_sleep_and_hrv() {
2479        let server = MockServer::start().await;
2480        let date = NaiveDate::from_ymd_opt(2025, 12, 4).unwrap();
2481
2482        let health_body = json!({
2483            "totalSteps": 1234,
2484            "sleepingSeconds": 1000,
2485            "averageStressLevel": 20
2486        });
2487
2488        Mock::given(method("GET"))
2489            .and(path("/usersummary-service/usersummary/daily/TestUser"))
2490            .and(query_param("calendarDate", "2025-12-04"))
2491            .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2492            .mount(&server)
2493            .await;
2494
2495        let sleep_fixture: serde_json::Value =
2496            serde_json::from_str(include_str!("../../tests/fixtures/sleep_2025-12-04.json"))
2497                .unwrap();
2498
2499        Mock::given(method("GET"))
2500            .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2501            .and(query_param("date", "2025-12-04"))
2502            .respond_with(ResponseTemplate::new(200).set_body_json(sleep_fixture))
2503            .mount(&server)
2504            .await;
2505
2506        let hrv_fixture: serde_json::Value =
2507            serde_json::from_str(include_str!("../../tests/fixtures/hrv.json")).unwrap();
2508
2509        Mock::given(method("GET"))
2510            .and(path("/hrv-service/hrv/2025-12-04"))
2511            .respond_with(ResponseTemplate::new(200).set_body_json(hrv_fixture))
2512            .mount(&server)
2513            .await;
2514
2515        let client = GarminClient::new_with_base_url(&server.uri());
2516        let mut task = SyncTask::new(
2517            1,
2518            SyncPipeline::Frontier,
2519            SyncTaskType::DailyHealth { date },
2520        );
2521        task.id = Some(1);
2522
2523        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2524            .await
2525            .unwrap();
2526
2527        match data {
2528            SyncData::Health { record, .. } => {
2529                assert_eq!(record.sleep_seconds, Some(31920));
2530                assert_eq!(record.deep_sleep_seconds, Some(8100));
2531                assert_eq!(record.light_sleep_seconds, Some(15300));
2532                assert_eq!(record.rem_sleep_seconds, Some(8520));
2533                assert_eq!(record.sleep_score, Some(88));
2534                assert_eq!(
2535                    record.sleep_note.as_deref(),
2536                    Some("Late caffeine, restless night.")
2537                );
2538                assert_eq!(record.hrv_weekly_avg, Some(65));
2539                assert_eq!(record.hrv_last_night, Some(68));
2540                assert_eq!(record.hrv_status.as_deref(), Some("BALANCED"));
2541            }
2542            _ => panic!("unexpected data type"),
2543        }
2544    }
2545
2546    #[tokio::test]
2547    async fn test_daily_health_handles_missing_sleep_and_hrv() {
2548        let server = MockServer::start().await;
2549        let date = NaiveDate::from_ymd_opt(2025, 12, 5).unwrap();
2550
2551        let health_body = json!({
2552            "totalSteps": 4321,
2553            "sleepingSeconds": 7200
2554        });
2555
2556        Mock::given(method("GET"))
2557            .and(path("/usersummary-service/usersummary/daily/TestUser"))
2558            .and(query_param("calendarDate", "2025-12-05"))
2559            .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2560            .mount(&server)
2561            .await;
2562
2563        Mock::given(method("GET"))
2564            .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2565            .and(query_param("date", "2025-12-05"))
2566            .respond_with(ResponseTemplate::new(404))
2567            .mount(&server)
2568            .await;
2569
2570        Mock::given(method("GET"))
2571            .and(path("/hrv-service/hrv/2025-12-05"))
2572            .respond_with(ResponseTemplate::new(404))
2573            .mount(&server)
2574            .await;
2575
2576        let client = GarminClient::new_with_base_url(&server.uri());
2577        let mut task = SyncTask::new(
2578            1,
2579            SyncPipeline::Frontier,
2580            SyncTaskType::DailyHealth { date },
2581        );
2582        task.id = Some(1);
2583
2584        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2585            .await
2586            .unwrap();
2587
2588        match data {
2589            SyncData::Health { record, .. } => {
2590                assert_eq!(record.sleep_seconds, Some(7200));
2591                assert_eq!(record.deep_sleep_seconds, None);
2592                assert_eq!(record.hrv_weekly_avg, None);
2593                assert_eq!(record.hrv_status, None);
2594            }
2595            _ => panic!("unexpected data type"),
2596        }
2597    }
2598
2599    #[test]
2600    fn test_parse_hrv_metrics_reads_last_night_avg() {
2601        let payload = json!({
2602            "hrvSummary": {
2603                "weeklyAvg": 60,
2604                "lastNightAvg": 58,
2605                "status": "BALANCED"
2606            }
2607        });
2608
2609        let (weekly_avg, last_night, status) = parse_hrv_metrics(Some(&payload));
2610        assert_eq!(weekly_avg, Some(60));
2611        assert_eq!(last_night, Some(58));
2612        assert_eq!(status.as_deref(), Some("BALANCED"));
2613    }
2614
2615    #[test]
2616    fn test_parse_gpx_reads_track_point_extension_streams() {
2617        let gpx_data = r#"
2618            <?xml version="1.0" encoding="UTF-8"?>
2619            <gpx creator="Garmin Connect" version="1.1"
2620              xmlns="http://www.topografix.com/GPX/1/1"
2621              xmlns:gpxtpx="http://www.garmin.com/xmlschemas/TrackPointExtension/v1"
2622              xmlns:gpxpx="http://www.garmin.com/xmlschemas/PowerExtension/v1">
2623              <trk>
2624                <trkseg>
2625                  <trkpt lat="37.0" lon="-122.0">
2626                    <ele>10.5</ele>
2627                    <time>2025-01-03T07:00:00Z</time>
2628                    <extensions>
2629                      <gpxtpx:TrackPointExtension>
2630                        <gpxtpx:hr>150</gpxtpx:hr>
2631                        <gpxtpx:cad>88</gpxtpx:cad>
2632                        <gpxtpx:speed>3.25</gpxtpx:speed>
2633                      </gpxtpx:TrackPointExtension>
2634                      <gpxpx:PowerExtension>
2635                        <gpxpx:PowerInWatts>245</gpxpx:PowerInWatts>
2636                      </gpxpx:PowerExtension>
2637                    </extensions>
2638                  </trkpt>
2639                  <trkpt lat="37.1" lon="-122.1">
2640                    <ele>11.0</ele>
2641                    <time>2025-01-03T07:00:01Z</time>
2642                    <extensions>
2643                      <gpxtpx:TrackPointExtension>
2644                        <gpxtpx:hr>151</gpxtpx:hr>
2645                      </gpxtpx:TrackPointExtension>
2646                    </extensions>
2647                  </trkpt>
2648                </trkseg>
2649              </trk>
2650            </gpx>
2651        "#;
2652
2653        let points = parse_gpx(42, gpx_data).unwrap();
2654
2655        assert_eq!(points.len(), 2);
2656        assert_eq!(points[0].activity_id, 42);
2657        assert_eq!(points[0].heart_rate, Some(150));
2658        assert_eq!(points[0].cadence, Some(88));
2659        assert_eq!(points[0].power, Some(245));
2660        assert_eq!(points[0].speed, Some(3.25));
2661        assert_eq!(points[1].heart_rate, Some(151));
2662        assert_eq!(points[1].cadence, None);
2663        assert_eq!(points[1].power, None);
2664        assert_eq!(points[1].speed, None);
2665    }
2666
2667    #[tokio::test]
2668    async fn test_performance_uses_date_scoped_endpoints() {
2669        let server = MockServer::start().await;
2670        let date = NaiveDate::from_ymd_opt(2025, 12, 10).unwrap();
2671
2672        let vo2_fixture: serde_json::Value =
2673            serde_json::from_str(include_str!("../../tests/fixtures/vo2max.json")).unwrap();
2674        Mock::given(method("GET"))
2675            .and(path(
2676                "/metrics-service/metrics/maxmet/daily/2025-12-10/2025-12-10",
2677            ))
2678            .respond_with(ResponseTemplate::new(200).set_body_json(vo2_fixture))
2679            .mount(&server)
2680            .await;
2681
2682        let race_fixture: serde_json::Value =
2683            serde_json::from_str(include_str!("../../tests/fixtures/race_predictions.json"))
2684                .unwrap();
2685        Mock::given(method("GET"))
2686            .and(path(
2687                "/metrics-service/metrics/racepredictions/daily/TestUser",
2688            ))
2689            .and(query_param("fromCalendarDate", "2025-12-10"))
2690            .and(query_param("toCalendarDate", "2025-12-10"))
2691            .respond_with(ResponseTemplate::new(200).set_body_json(race_fixture))
2692            .mount(&server)
2693            .await;
2694
2695        let readiness_fixture: serde_json::Value =
2696            serde_json::from_str(include_str!("../../tests/fixtures/training_readiness.json"))
2697                .unwrap();
2698        Mock::given(method("GET"))
2699            .and(path(
2700                "/metrics-service/metrics/trainingreadiness/2025-12-10",
2701            ))
2702            .respond_with(ResponseTemplate::new(200).set_body_json(readiness_fixture))
2703            .mount(&server)
2704            .await;
2705
2706        let training_status_fixture = json!({
2707            "mostRecentTrainingStatus": {
2708                "latestTrainingStatusData": {
2709                    "123": {
2710                        "calendarDate": "2025-12-10",
2711                        "trainingStatusFeedbackPhrase": "PRODUCTIVE"
2712                    }
2713                }
2714            }
2715        });
2716        Mock::given(method("GET"))
2717            .and(path(
2718                "/metrics-service/metrics/trainingstatus/aggregated/2025-12-10",
2719            ))
2720            .respond_with(ResponseTemplate::new(200).set_body_json(training_status_fixture))
2721            .mount(&server)
2722            .await;
2723
2724        let endurance_fixture: serde_json::Value =
2725            serde_json::from_str(include_str!("../../tests/fixtures/endurance_score.json"))
2726                .unwrap();
2727        Mock::given(method("GET"))
2728            .and(path("/metrics-service/metrics/endurancescore"))
2729            .and(query_param("calendarDate", "2025-12-10"))
2730            .respond_with(ResponseTemplate::new(200).set_body_json(endurance_fixture))
2731            .mount(&server)
2732            .await;
2733
2734        let hill_fixture: serde_json::Value =
2735            serde_json::from_str(include_str!("../../tests/fixtures/hill_score.json")).unwrap();
2736        Mock::given(method("GET"))
2737            .and(path("/metrics-service/metrics/hillscore"))
2738            .and(query_param("calendarDate", "2025-12-10"))
2739            .respond_with(ResponseTemplate::new(200).set_body_json(hill_fixture))
2740            .mount(&server)
2741            .await;
2742
2743        let fitness_age_fixture = json!({
2744            "calendarDate": "2025-12-10",
2745            "fitnessAge": 37.0
2746        });
2747        Mock::given(method("GET"))
2748            .and(path("/fitnessage-service/fitnessage/2025-12-10"))
2749            .respond_with(ResponseTemplate::new(200).set_body_json(fitness_age_fixture))
2750            .mount(&server)
2751            .await;
2752
2753        let client = GarminClient::new_with_base_url(&server.uri());
2754        let mut task = SyncTask::new(
2755            1,
2756            SyncPipeline::Backfill,
2757            SyncTaskType::Performance { date },
2758        );
2759        task.id = Some(1);
2760
2761        let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2762            .await
2763            .unwrap();
2764
2765        match data {
2766            SyncData::Performance { record, .. } => {
2767                assert_eq!(record.vo2max, Some(53.0));
2768                assert_eq!(record.fitness_age, Some(37));
2769                assert_eq!(record.training_readiness, Some(69));
2770                assert_eq!(record.training_status.as_deref(), Some("PRODUCTIVE"));
2771                assert_eq!(record.race_5k_sec, Some(1245));
2772                assert_eq!(record.race_10k_sec, Some(2610));
2773                assert_eq!(record.race_half_sec, Some(5850));
2774                assert_eq!(record.race_marathon_sec, Some(12420));
2775                assert_eq!(record.endurance_score, Some(72));
2776                assert_eq!(record.hill_score, Some(58));
2777            }
2778            _ => panic!("unexpected data type"),
2779        }
2780    }
2781
2782    #[test]
2783    fn test_parse_garmin_datetime_accepts_naive_strings() {
2784        let dt = parse_garmin_datetime("2025-01-03 07:00:00");
2785        assert!(dt.is_some());
2786
2787        let dt = parse_garmin_datetime("2025-01-03 07:00:00.123");
2788        assert!(dt.is_some());
2789    }
2790}