1pub mod progress;
13pub mod rate_limiter;
14pub mod task_queue;
15
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
20use tokio::sync::{mpsc, Mutex as TokioMutex};
21
22use crate::client::{GarminClient, OAuth2Token};
23use crate::db::models::{
24 Activity, DailyHealth, PerformanceMetrics, SyncPipeline, SyncTask, SyncTaskType, TrackPoint,
25};
26use crate::storage::{ParquetStore, Storage, SyncDb};
27use crate::{GarminError, Result};
28use std::io::{self, Write};
29
30pub use progress::{PlanningStep, SharedProgress, SyncProgress};
31pub use rate_limiter::{RateLimiter, SharedRateLimiter};
32pub use task_queue::{SharedTaskQueue, TaskQueue};
33
34fn pipeline_filter(mode: progress::SyncMode) -> Option<SyncPipeline> {
35 match mode {
36 progress::SyncMode::Latest => Some(SyncPipeline::Frontier),
37 progress::SyncMode::Backfill => Some(SyncPipeline::Backfill),
38 }
39}
40
41#[derive(Debug)]
43enum SyncData {
44 Activities {
46 records: Vec<Activity>,
47 gpx_tasks: Vec<SyncTask>,
48 next_page: Option<SyncTask>,
49 task_id: i64,
50 },
51 Health { record: DailyHealth, task_id: i64 },
53 Performance {
55 record: PerformanceMetrics,
56 task_id: i64,
57 },
58 TrackPoints {
60 #[allow(dead_code)]
61 activity_id: i64,
62 date: NaiveDate,
63 points: Vec<TrackPoint>,
64 task_id: i64,
65 },
66}
67
68pub struct SyncEngine {
70 storage: Storage,
71 client: GarminClient,
72 token: OAuth2Token,
73 rate_limiter: RateLimiter,
74 queue: TaskQueue,
75 profile_id: i32,
76 display_name: Option<String>,
77}
78
79#[derive(Clone)]
80struct ProducerContext {
81 rate_limiter: SharedRateLimiter,
82 client: GarminClient,
83 token: OAuth2Token,
84 progress: SharedProgress,
85 display_name: Arc<String>,
86 profile_id: i32,
87 stats: Arc<TokioMutex<SyncStats>>,
88 in_flight: Arc<AtomicUsize>,
89 parquet: Arc<ParquetStore>,
90 force: bool,
91 pipeline_filter: Option<SyncPipeline>,
92}
93
94type SleepMetrics = (
95 Option<i32>,
96 Option<i32>,
97 Option<i32>,
98 Option<i32>,
99 Option<i32>,
100);
101
102impl SyncEngine {
103 pub fn new(client: GarminClient, token: OAuth2Token) -> Result<Self> {
105 let storage = Storage::open_default()?;
106 Self::with_storage(storage, client, token)
107 }
108
109 pub fn with_storage(
111 storage: Storage,
112 client: GarminClient,
113 token: OAuth2Token,
114 ) -> Result<Self> {
115 let profile_id = storage.sync_db.get_or_create_profile("default")?;
117
118 let sync_db = SyncDb::open(storage.base_path().join("sync.db"))?;
120 let queue = TaskQueue::new(sync_db, profile_id, None);
121
122 Ok(Self {
123 storage,
124 client,
125 token,
126 rate_limiter: RateLimiter::new(),
127 queue,
128 profile_id,
129 display_name: None,
130 })
131 }
132
133 async fn get_display_name(&mut self) -> Result<String> {
135 if let Some(ref name) = self.display_name {
136 return Ok(name.clone());
137 }
138
139 let profile: serde_json::Value = self
140 .client
141 .get_json(&self.token, "/userprofile-service/socialProfile")
142 .await?;
143
144 let name = profile
145 .get("displayName")
146 .and_then(|v| v.as_str())
147 .map(|s| s.to_string())
148 .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
149
150 self.profile_id = self.storage.sync_db.get_or_create_profile(&name)?;
152 self.queue.set_profile_id(self.profile_id);
153
154 self.display_name = Some(name.clone());
155 Ok(name)
156 }
157
158 pub(crate) async fn find_oldest_activity_date(
161 &mut self,
162 progress: Option<&SyncProgress>,
163 ) -> Result<(NaiveDate, u32, u32)> {
164 if let Some(p) = progress {
165 p.set_planning_step(PlanningStep::FindingOldestActivity);
166 } else {
167 print!("Finding oldest activity date...");
168 let _ = io::stdout().flush();
169 }
170
171 self.rate_limiter.wait().await;
175
176 let limit: u32 = 100;
178 let mut jump: u32 = 100;
179 let mut last_non_empty: u32 = 0;
180 let max_jump: u32 = 1_000_000;
181
182 while jump < max_jump {
184 let path = format!(
185 "/activitylist-service/activities/search/activities?limit=1&start={}",
186 jump
187 );
188
189 let activities: Vec<serde_json::Value> =
190 self.client.get_json(&self.token, &path).await?;
191
192 if activities.is_empty() {
193 break;
194 }
195
196 last_non_empty = jump;
197 jump = jump.saturating_mul(2);
198 self.rate_limiter.wait().await;
199 }
200
201 let mut low = last_non_empty;
203 let mut high = jump;
204
205 while high - low > limit {
206 let mid = (low + high) / 2;
207 let path = format!(
208 "/activitylist-service/activities/search/activities?limit=1&start={}",
209 mid
210 );
211
212 self.rate_limiter.wait().await;
213 let activities: Vec<serde_json::Value> =
214 self.client.get_json(&self.token, &path).await?;
215
216 if activities.is_empty() {
217 high = mid;
218 } else {
219 low = mid;
220 }
221 }
222
223 let path = format!(
225 "/activitylist-service/activities/search/activities?limit={}&start={}",
226 limit, low
227 );
228
229 self.rate_limiter.wait().await;
230 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
231
232 let total_activities = low + activities.len() as u32;
234
235 let oldest_date = activities
236 .last()
237 .and_then(|activity| activity.get("startTimeLocal"))
238 .and_then(|v| v.as_str())
239 .and_then(|date_str| date_str.split(' ').next())
240 .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
241
242 let result = oldest_date.unwrap_or_else(|| {
243 Utc::now().date_naive() - Duration::days(365)
245 });
246
247 let estimated_gps = (total_activities as f32 * 0.8) as u32;
250
251 if let Some(p) = progress {
252 p.set_oldest_activity_date(&result.to_string());
253 } else {
254 println!(" {} ({} activities)", result, total_activities);
255 }
256 Ok((result, total_activities, estimated_gps))
257 }
258
259 async fn has_health_data(&mut self, display_name: &str, date: NaiveDate) -> Result<bool> {
260 let path = format!(
261 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
262 display_name, date
263 );
264
265 self.rate_limiter.wait().await;
266 let result: std::result::Result<serde_json::Value, _> =
267 self.client.get_json(&self.token, &path).await;
268
269 match result {
270 Ok(data) => Ok(!data.as_object().map(|o| o.is_empty()).unwrap_or(true)),
271 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
272 Err(e) => Err(e),
273 }
274 }
275
276 async fn find_first_health_date(
277 &mut self,
278 progress: Option<&SyncProgress>,
279 from: NaiveDate,
280 to: NaiveDate,
281 ) -> Result<Option<NaiveDate>> {
282 if let Some(p) = progress {
283 p.set_planning_step(PlanningStep::FindingFirstHealth);
284 }
285
286 if from > to {
287 return Ok(None);
288 }
289
290 let display_name = self.get_display_name().await?;
291 let mut low = from;
292 let mut high = to;
293 let mut found = None;
294
295 while low <= high {
296 let span = (high - low).num_days();
297 let mid = low + Duration::days(span / 2);
298
299 if self.has_health_data(&display_name, mid).await? {
300 found = Some(mid);
301 if mid == low {
302 break;
303 }
304 high = mid - Duration::days(1);
305 } else {
306 low = mid + Duration::days(1);
307 }
308 }
309
310 Ok(found)
311 }
312
313 async fn has_performance_data(&mut self, date: NaiveDate) -> Result<bool> {
314 let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
315 self.rate_limiter.wait().await;
316 let readiness: std::result::Result<serde_json::Value, _> =
317 self.client.get_json(&self.token, &readiness_path).await;
318
319 if let Ok(data) = readiness {
320 if data.as_array().and_then(|arr| arr.first()).is_some() {
321 return Ok(true);
322 }
323 }
324
325 let status_path = format!(
326 "/metrics-service/metrics/trainingstatus/aggregated/{}",
327 date
328 );
329 self.rate_limiter.wait().await;
330 let status: std::result::Result<serde_json::Value, _> =
331 self.client.get_json(&self.token, &status_path).await;
332
333 match status {
334 Ok(data) => Ok(data.get("mostRecentTrainingStatus").is_some()),
335 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
336 Err(e) => Err(e),
337 }
338 }
339
340 async fn find_first_performance_date(
341 &mut self,
342 progress: Option<&SyncProgress>,
343 from: NaiveDate,
344 to: NaiveDate,
345 ) -> Result<Option<NaiveDate>> {
346 if let Some(p) = progress {
347 p.set_planning_step(PlanningStep::FindingFirstPerformance);
348 }
349
350 if from > to {
351 return Ok(None);
352 }
353
354 let mut low = from;
355 let mut high = to;
356 let mut found = None;
357
358 while low <= high {
359 let span = (high - low).num_days();
360 let mid = low + Duration::days(span / 2);
361
362 if self.has_performance_data(mid).await? {
363 found = Some(mid);
364 if mid == low {
365 break;
366 }
367 high = mid - Duration::days(1);
368 } else {
369 low = mid + Duration::days(1);
370 }
371 }
372
373 Ok(found)
374 }
375
376 pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
378 self.run_with_progress(&opts).await
379 }
380
381 async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
383 let progress = Arc::new(SyncProgress::new());
384
385 progress.set_storage_path(&self.storage.base_path().display().to_string());
387 progress.set_sync_mode(opts.mode);
388 println!("Using storage: {}", progress.get_storage_path());
389 println!("Planning sync...");
390
391 progress.set_planning_step(PlanningStep::FetchingProfile);
393 let display_name = self.get_display_name().await?;
394 progress.set_profile(&display_name);
395 self.queue.set_pipeline(pipeline_filter(opts.mode));
396
397 let _recovered = self.queue.recover_in_progress()?;
399
400 if self.queue.pending_count()? == 0 {
402 self.plan_sync_with_progress(opts, &progress).await?;
403 }
404
405 progress.finish_planning();
407
408 self.count_tasks_for_progress(&progress)?;
410
411 let today = Utc::now().date_naive();
413 let to_date = opts.to_date.unwrap_or(today);
414
415 match opts.mode {
416 progress::SyncMode::Latest => {
417 let from_date = opts.from_date.unwrap_or_else(|| today - Duration::days(7));
419 progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
420 progress.set_date_range(&from_date.to_string(), &to_date.to_string());
421 }
422 progress::SyncMode::Backfill => {
423 let oldest = progress
425 .get_oldest_activity_date()
426 .unwrap_or_else(|| (today - Duration::days(365)).to_string());
427 let from_date = opts
428 .from_date
429 .map(|d| d.to_string())
430 .unwrap_or(oldest.clone());
431 progress.set_backfill_range(&from_date, &oldest);
432 progress.set_date_range(&oldest, &from_date);
433 }
434 }
435
436 print_sync_overview(&progress);
437 println!("Planning complete.");
438
439 let stats_result = if opts.dry_run {
440 println!("Dry run mode - no changes will be made");
441 Ok(SyncStats::default())
442 } else {
443 let reporter_progress = progress.clone();
444 let reporter_handle = tokio::spawn(async move {
445 run_progress_reporter(reporter_progress).await;
446 });
447
448 let result = self
449 .run_with_progress_tracking(opts, progress.clone())
450 .await;
451 progress.request_shutdown();
452 let _ = reporter_handle.await;
453 result
454 };
455
456 let stats = stats_result?;
457 print_sync_errors(&progress);
458
459 if !opts.dry_run && stats.completed > 0 {
461 self.update_sync_state_after_completion(opts, today).await?;
462 }
463
464 if !opts.dry_run {
465 println!("\nSync complete: {}", stats);
466 }
467 Ok(stats)
468 }
469
470 async fn update_sync_state_after_completion(
472 &self,
473 opts: &SyncOptions,
474 today: NaiveDate,
475 ) -> Result<()> {
476 use crate::db::models::SyncState;
477
478 match opts.mode {
479 progress::SyncMode::Latest => {
480 if opts.sync_activities {
482 let state = SyncState {
483 profile_id: self.profile_id,
484 data_type: "activities".to_string(),
485 last_sync_date: Some(today),
486 last_activity_id: None,
487 };
488 self.storage.sync_db.update_sync_state(&state)?;
489 }
490 if opts.sync_health {
491 let state = SyncState {
492 profile_id: self.profile_id,
493 data_type: "health".to_string(),
494 last_sync_date: Some(today),
495 last_activity_id: None,
496 };
497 self.storage.sync_db.update_sync_state(&state)?;
498 }
499 }
500 progress::SyncMode::Backfill => {
501 let pending = self.queue.pending_count()?;
503 if pending == 0 {
504 self.storage
506 .sync_db
507 .mark_backfill_complete(self.profile_id, "activities")?;
508 self.storage
509 .sync_db
510 .mark_backfill_complete(self.profile_id, "health")?;
511 self.storage
512 .sync_db
513 .mark_backfill_complete(self.profile_id, "performance")?;
514 }
515 }
516 }
517
518 Ok(())
519 }
520
521 async fn run_with_progress_tracking(
523 &mut self,
524 opts: &SyncOptions,
525 progress: SharedProgress,
526 ) -> Result<SyncStats> {
527 self.run_parallel(opts, progress).await
529 }
530
531 async fn run_parallel(
536 &mut self,
537 opts: &SyncOptions,
538 progress: SharedProgress,
539 ) -> Result<SyncStats> {
540 let rate_limiter = SharedRateLimiter::new(opts.concurrency);
541 let queue = SharedTaskQueue::new(TaskQueue::new(
542 SyncDb::open(self.storage.base_path().join("sync.db"))?,
543 self.profile_id,
544 pipeline_filter(opts.mode),
545 ));
546
547 self.run_parallel_with_resources(
548 opts,
549 progress,
550 queue,
551 rate_limiter,
552 pipeline_filter(opts.mode),
553 )
554 .await
555 }
556
557 pub(crate) async fn run_parallel_with_resources(
558 &mut self,
559 opts: &SyncOptions,
560 progress: SharedProgress,
561 queue: SharedTaskQueue,
562 rate_limiter: SharedRateLimiter,
563 pipeline_filter: Option<SyncPipeline>,
564 ) -> Result<SyncStats> {
565 let (tx, rx) = mpsc::channel::<SyncData>(100);
567
568 let parquet = Arc::new(self.storage.parquet.clone());
570 let client = self.client.clone();
571 let token = self.token.clone();
572 let stats = Arc::new(TokioMutex::new(SyncStats::default()));
573 let in_flight = Arc::new(AtomicUsize::new(0));
574 let display_name = Arc::new(self.get_display_name().await?);
575 let profile_id = self.profile_id;
576
577 let mut producer_handles = Vec::new();
579 for id in 0..opts.concurrency {
580 let tx = tx.clone();
581 let queue = queue.clone();
582 let context = ProducerContext {
583 rate_limiter: rate_limiter.clone(),
584 client: client.clone(),
585 token: token.clone(),
586 progress: progress.clone(),
587 display_name: Arc::clone(&display_name),
588 profile_id,
589 stats: Arc::clone(&stats),
590 in_flight: Arc::clone(&in_flight),
591 parquet: Arc::clone(&parquet),
592 force: opts.force,
593 pipeline_filter,
594 };
595
596 producer_handles.push(tokio::spawn(async move {
597 producer_loop(id, queue, tx, context).await
598 }));
599 }
600 drop(tx); let rx = Arc::new(TokioMutex::new(rx));
604 let mut consumer_handles = Vec::new();
605 for id in 0..opts.concurrency {
606 let rx = Arc::clone(&rx);
607 let parquet = Arc::clone(&parquet);
608 let queue = queue.clone();
609 let stats = Arc::clone(&stats);
610 let progress = progress.clone();
611 let in_flight = Arc::clone(&in_flight);
612
613 consumer_handles.push(tokio::spawn(async move {
614 consumer_loop(id, rx, parquet, queue, stats, progress, in_flight).await
615 }));
616 }
617
618 for h in producer_handles {
620 if let Err(e) = h.await {
621 eprintln!("Producer error: {}", e);
622 }
623 }
624
625 for h in consumer_handles {
627 if let Err(e) = h.await {
628 eprintln!("Consumer error: {}", e);
629 }
630 }
631
632 queue.cleanup(7).await?;
634
635 self.storage
637 .sync_db
638 .update_profile_sync_time(self.profile_id)?;
639
640 let final_stats = stats.lock().await;
642 Ok(SyncStats {
643 recovered: final_stats.recovered,
644 completed: final_stats.completed,
645 rate_limited: final_stats.rate_limited,
646 failed: final_stats.failed,
647 })
648 }
649
650 fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
655 let (_activities, _gpx, health, performance) = self.queue.count_by_type()?;
657
658 if progress.activities.get_total() == 0 {
661 progress.activities.set_total(1); progress.activities.set_dynamic(true);
663 }
664 if progress.gpx.get_total() == 0 {
665 progress.gpx.set_dynamic(true); }
667
668 progress.health.set_total(health);
670 progress.performance.set_total(performance);
671
672 Ok(())
673 }
674
675 async fn plan_sync_with_progress(
677 &mut self,
678 opts: &SyncOptions,
679 progress: &SyncProgress,
680 ) -> Result<()> {
681 let today = Utc::now().date_naive();
682
683 match opts.mode {
684 progress::SyncMode::Latest => self.plan_latest_sync(opts, progress, today).await,
685 progress::SyncMode::Backfill => self.plan_backfill_sync(opts, progress, today).await,
686 }
687 }
688
689 pub(crate) async fn plan_latest_sync(
691 &mut self,
692 opts: &SyncOptions,
693 progress: &SyncProgress,
694 today: NaiveDate,
695 ) -> Result<()> {
696 let last_sync = self
698 .storage
699 .sync_db
700 .get_sync_state(self.profile_id, "activities")?;
701 let from_date = opts.from_date.unwrap_or_else(|| {
702 last_sync
703 .and_then(|s| s.last_sync_date)
704 .unwrap_or_else(|| today - Duration::days(7))
705 });
706 let to_date = opts.to_date.unwrap_or(today);
707
708 progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
709 progress.set_oldest_activity_date(&from_date.to_string());
710
711 let total_days = (to_date - from_date).num_days().max(0) as u32 + 1;
712
713 if opts.sync_activities {
715 progress.set_planning_step(PlanningStep::PlanningActivities);
716 self.plan_activities_sync(SyncPipeline::Frontier, Some(from_date), Some(to_date))?;
717 }
718
719 if opts.sync_health {
721 progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
722 self.plan_health_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
723 }
724
725 if opts.sync_performance {
727 let total_weeks = (total_days / 7).max(1);
728 progress.set_planning_step(PlanningStep::PlanningPerformance { weeks: total_weeks });
729 self.plan_performance_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
730 }
731
732 Ok(())
733 }
734
735 pub(crate) async fn plan_backfill_sync(
737 &mut self,
738 opts: &SyncOptions,
739 progress: &SyncProgress,
740 today: NaiveDate,
741 ) -> Result<()> {
742 let (oldest_date, total_activities, estimated_gps) =
744 self.find_oldest_activity_date(Some(progress)).await?;
745
746 let backfill_state = self
748 .storage
749 .sync_db
750 .get_backfill_state(self.profile_id, "activities")?;
751
752 let (frontier_date, activities_complete) = match backfill_state {
753 Some((frontier, _target, complete)) => (frontier, complete),
754 None => {
755 let last_sync = self
757 .storage
758 .sync_db
759 .get_sync_state(self.profile_id, "activities")?;
760 let frontier = last_sync.and_then(|s| s.last_sync_date).unwrap_or(today);
761
762 self.storage.sync_db.set_backfill_state(
764 self.profile_id,
765 "activities",
766 frontier,
767 oldest_date,
768 false,
769 )?;
770 (frontier, false)
771 }
772 };
773
774 let activity_from = opts.from_date.unwrap_or(oldest_date);
776 let activity_to = opts.to_date.unwrap_or(frontier_date);
777
778 progress.set_backfill_range(&frontier_date.to_string(), &oldest_date.to_string());
779 progress.set_oldest_activity_date(&oldest_date.to_string());
780
781 if opts.sync_activities && !activities_complete {
783 progress.set_planning_step(PlanningStep::PlanningActivities);
784 self.plan_activities_sync(
785 SyncPipeline::Backfill,
786 Some(activity_from),
787 Some(activity_to),
788 )?;
789 if total_activities > 0 {
790 progress.activities.set_total(total_activities);
791 progress.gpx.set_total(estimated_gps);
792 }
793 }
794
795 if opts.sync_health {
796 let health_state = self
797 .storage
798 .sync_db
799 .get_backfill_state(self.profile_id, "health")?;
800
801 let health_target = match health_state {
802 Some((_frontier, target, complete)) => (!complete).then_some(target),
803 None => {
804 let search_from = activity_from;
805 let search_to = activity_to;
806 let first_health = self
807 .find_first_health_date(Some(progress), search_from, search_to)
808 .await?;
809 match first_health {
810 Some(first) => {
811 self.storage.sync_db.set_backfill_state(
812 self.profile_id,
813 "health",
814 frontier_date,
815 first,
816 false,
817 )?;
818 Some(first)
819 }
820 None => {
821 self.storage.sync_db.set_backfill_state(
822 self.profile_id,
823 "health",
824 frontier_date,
825 frontier_date,
826 true,
827 )?;
828 None
829 }
830 }
831 }
832 };
833
834 if let Some(target) = health_target {
835 let health_from = std::cmp::max(activity_from, target);
836 let health_to = activity_to;
837 if health_from <= health_to {
838 let total_days = (health_to - health_from).num_days().max(0) as u32 + 1;
839 progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
840 self.plan_health_sync(
841 health_from,
842 health_to,
843 opts.force,
844 SyncPipeline::Backfill,
845 )?;
846 }
847 }
848 }
849
850 if opts.sync_performance {
851 let perf_state = self
852 .storage
853 .sync_db
854 .get_backfill_state(self.profile_id, "performance")?;
855
856 let perf_target = match perf_state {
857 Some((_frontier, target, complete)) => (!complete).then_some(target),
858 None => {
859 let search_from = activity_from;
860 let search_to = activity_to;
861 let first_perf = self
862 .find_first_performance_date(Some(progress), search_from, search_to)
863 .await?;
864 match first_perf {
865 Some(first) => {
866 self.storage.sync_db.set_backfill_state(
867 self.profile_id,
868 "performance",
869 frontier_date,
870 first,
871 false,
872 )?;
873 Some(first)
874 }
875 None => {
876 self.storage.sync_db.set_backfill_state(
877 self.profile_id,
878 "performance",
879 frontier_date,
880 frontier_date,
881 true,
882 )?;
883 None
884 }
885 }
886 }
887 };
888
889 if let Some(target) = perf_target {
890 let perf_from = std::cmp::max(activity_from, target);
891 let perf_to = activity_to;
892 if perf_from <= perf_to {
893 let total_weeks = ((perf_to - perf_from).num_days().max(0) as u32 / 7).max(1);
894 progress.set_planning_step(PlanningStep::PlanningPerformance {
895 weeks: total_weeks,
896 });
897 self.plan_performance_sync(
898 perf_from,
899 perf_to,
900 opts.force,
901 SyncPipeline::Backfill,
902 )?;
903 }
904 }
905 }
906
907 Ok(())
908 }
909
910 fn plan_activities_sync(
912 &self,
913 pipeline: SyncPipeline,
914 min_date: Option<NaiveDate>,
915 max_date: Option<NaiveDate>,
916 ) -> Result<()> {
917 let task = SyncTask::new(
919 self.profile_id,
920 pipeline,
921 SyncTaskType::Activities {
922 start: 0,
923 limit: 50,
924 min_date,
925 max_date,
926 },
927 );
928 self.queue.push(task)?;
929 Ok(())
930 }
931
932 fn plan_health_sync(
934 &self,
935 from: NaiveDate,
936 to: NaiveDate,
937 force: bool,
938 pipeline: SyncPipeline,
939 ) -> Result<u32> {
940 let mut count = 0;
941 let mut date = from;
942 while date <= to {
943 if force
944 || !self
945 .storage
946 .parquet
947 .has_daily_health(self.profile_id, date)?
948 {
949 let task = SyncTask::new(
950 self.profile_id,
951 pipeline,
952 SyncTaskType::DailyHealth { date },
953 );
954 self.queue.push(task)?;
955 count += 1;
956 }
957 date += Duration::days(1);
958 }
959 Ok(count)
960 }
961
962 fn plan_performance_sync(
964 &self,
965 from: NaiveDate,
966 to: NaiveDate,
967 force: bool,
968 pipeline: SyncPipeline,
969 ) -> Result<u32> {
970 let mut count = 0;
972 let mut date = from;
973 while date <= to {
974 if force
975 || !self
976 .storage
977 .parquet
978 .has_performance_metrics(self.profile_id, date)?
979 {
980 let task = SyncTask::new(
981 self.profile_id,
982 pipeline,
983 SyncTaskType::Performance { date },
984 );
985 self.queue.push(task)?;
986 count += 1;
987 }
988 date += Duration::days(7);
989 }
990 Ok(count)
991 }
992}
993
994fn print_sync_overview(progress: &SyncProgress) {
995 println!("Profile: {}", progress.get_profile());
996 println!("Mode: {}", progress.get_sync_mode());
997 println!("Range: {}", progress.get_date_range());
998 println!(
999 "Queued tasks: activities {}, gpx {}, health {}, performance {}",
1000 progress.activities.get_total(),
1001 progress.gpx.get_total(),
1002 progress.health.get_total(),
1003 progress.performance.get_total()
1004 );
1005}
1006
1007fn print_sync_errors(progress: &SyncProgress) {
1008 let errors = progress.get_errors();
1009 if errors.is_empty() {
1010 return;
1011 }
1012
1013 println!("\nRecent errors:");
1014 for error in errors.iter().take(5) {
1015 println!(" [{}] {}: {}", error.stream, error.item, error.error);
1016 }
1017
1018 if errors.len() > 5 {
1019 println!(" ... and {} more", errors.len() - 5);
1020 }
1021}
1022
1023async fn run_progress_reporter(progress: SharedProgress) {
1024 loop {
1025 progress.print_simple_status();
1026
1027 if progress.should_shutdown() || progress.is_complete() {
1028 println!();
1029 break;
1030 }
1031
1032 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1033 }
1034}
1035
1036#[derive(Clone)]
1038pub struct SyncOptions {
1039 pub sync_activities: bool,
1041 pub sync_health: bool,
1043 pub sync_performance: bool,
1045 pub from_date: Option<NaiveDate>,
1047 pub to_date: Option<NaiveDate>,
1049 pub dry_run: bool,
1051 pub force: bool,
1053 pub concurrency: usize,
1055 pub mode: progress::SyncMode,
1057}
1058
1059impl Default for SyncOptions {
1060 fn default() -> Self {
1061 Self {
1062 sync_activities: false,
1063 sync_health: false,
1064 sync_performance: false,
1065 from_date: None,
1066 to_date: None,
1067 dry_run: false,
1068 force: false,
1069 concurrency: 4,
1070 mode: progress::SyncMode::Latest,
1071 }
1072 }
1073}
1074
1075#[derive(Default)]
1077pub struct SyncStats {
1078 pub recovered: u32,
1080 pub completed: u32,
1082 pub rate_limited: u32,
1084 pub failed: u32,
1086}
1087
1088impl std::fmt::Display for SyncStats {
1089 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1090 write!(
1091 f,
1092 "Completed: {}, Failed: {}, Rate limited: {}",
1093 self.completed, self.failed, self.rate_limited
1094 )?;
1095 if self.recovered > 0 {
1096 write!(f, ", Recovered: {}", self.recovered)?;
1097 }
1098 Ok(())
1099 }
1100}
1101
1102fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1104 let desc = match &task.task_type {
1105 SyncTaskType::Activities { start, limit, .. } => {
1106 format!("Activities {}-{}", start, start + limit)
1107 }
1108 SyncTaskType::DownloadGpx {
1109 activity_name,
1110 activity_date,
1111 ..
1112 } => {
1113 let name = activity_name.as_deref().unwrap_or("Unknown");
1114 let date = activity_date.as_deref().unwrap_or("");
1115 if date.is_empty() {
1116 name.to_string()
1117 } else {
1118 format!("{} {}", date, name)
1119 }
1120 }
1121 SyncTaskType::DailyHealth { date } => date.to_string(),
1122 SyncTaskType::Performance { date } => date.to_string(),
1123 _ => String::new(),
1124 };
1125
1126 match &task.task_type {
1127 SyncTaskType::Activities { .. } => {
1128 progress.activities.set_current_item(desc.clone());
1129 progress.activities.set_last_item(desc);
1130 }
1131 SyncTaskType::DownloadGpx { .. } => {
1132 progress.gpx.set_current_item(desc.clone());
1133 progress.gpx.set_last_item(desc);
1134 }
1135 SyncTaskType::DailyHealth { .. } => {
1136 progress.health.set_current_item(desc.clone());
1137 progress.health.set_last_item(desc);
1138 }
1139 SyncTaskType::Performance { .. } => {
1140 progress.performance.set_current_item(desc.clone());
1141 progress.performance.set_last_item(desc);
1142 }
1143 _ => {}
1144 }
1145}
1146
1147#[allow(dead_code)]
1149fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1150 match &task.task_type {
1151 SyncTaskType::Activities { .. } => {
1152 progress.activities.complete_one();
1153 progress.activities.clear_current_item();
1154 }
1155 SyncTaskType::DownloadGpx { .. } => {
1156 progress.gpx.complete_one();
1157 progress.gpx.clear_current_item();
1158 }
1159 SyncTaskType::DailyHealth { .. } => {
1160 progress.health.complete_one();
1161 progress.health.clear_current_item();
1162 }
1163 SyncTaskType::Performance { .. } => {
1164 progress.performance.complete_one();
1165 progress.performance.clear_current_item();
1166 }
1167 _ => {}
1168 }
1169}
1170
1171fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress, error: &str) {
1173 let (stream_name, item_desc) = match &task.task_type {
1174 SyncTaskType::Activities { start, limit, .. } => {
1175 progress.activities.fail_one();
1176 progress.activities.clear_current_item();
1177 ("Activities", format!("{}-{}", start, start + limit))
1178 }
1179 SyncTaskType::DownloadGpx {
1180 activity_id,
1181 activity_name,
1182 ..
1183 } => {
1184 progress.gpx.fail_one();
1185 progress.gpx.clear_current_item();
1186 (
1187 "GPX",
1188 activity_name
1189 .clone()
1190 .unwrap_or_else(|| activity_id.to_string()),
1191 )
1192 }
1193 SyncTaskType::DailyHealth { date } => {
1194 progress.health.fail_one();
1195 progress.health.clear_current_item();
1196 ("Health", date.to_string())
1197 }
1198 SyncTaskType::Performance { date } => {
1199 progress.performance.fail_one();
1200 progress.performance.clear_current_item();
1201 ("Performance", date.to_string())
1202 }
1203 _ => return,
1204 };
1205
1206 progress.add_error(stream_name, item_desc, error.to_string());
1207}
1208
1209const MAX_IDLE_RETRIES: u32 = 10;
1210
1211fn should_exit_when_idle(idle_loops: u32, in_flight: usize) -> bool {
1212 idle_loops >= MAX_IDLE_RETRIES && in_flight == 0
1213}
1214
1215fn record_write_failure(data: &SyncData, progress: &SyncProgress, error: &str) {
1216 match data {
1217 SyncData::Activities { records, .. } => {
1218 progress.activities.fail_one();
1219 progress.activities.clear_current_item();
1220 let item = records
1221 .first()
1222 .map(|r| r.activity_id.to_string())
1223 .unwrap_or_else(|| "batch".to_string());
1224 progress.add_error("Activities", item, error.to_string());
1225 }
1226 SyncData::Health { record, .. } => {
1227 progress.health.fail_one();
1228 progress.health.clear_current_item();
1229 progress.add_error("Health", record.date.to_string(), error.to_string());
1230 }
1231 SyncData::Performance { record, .. } => {
1232 progress.performance.fail_one();
1233 progress.performance.clear_current_item();
1234 progress.add_error("Performance", record.date.to_string(), error.to_string());
1235 }
1236 SyncData::TrackPoints {
1237 activity_id, date, ..
1238 } => {
1239 progress.gpx.fail_one();
1240 progress.gpx.clear_current_item();
1241 progress.add_error(
1242 "GPX",
1243 format!("{} ({})", date, activity_id),
1244 error.to_string(),
1245 );
1246 }
1247 }
1248}
1249
1250async fn producer_loop(
1256 _id: usize,
1257 queue: SharedTaskQueue,
1258 tx: mpsc::Sender<SyncData>,
1259 context: ProducerContext,
1260) {
1261 let mut empty_count = 0;
1262
1263 loop {
1264 let next_task = match context.pipeline_filter {
1266 Some(pipeline) => queue.pop_round_robin_with_pipeline(Some(pipeline)).await,
1267 None => queue.pop_round_robin().await,
1268 };
1269
1270 let task = match next_task {
1271 Ok(Some(task)) => {
1272 empty_count = 0; task
1274 }
1275 Ok(None) => {
1276 empty_count += 1;
1279 if should_exit_when_idle(empty_count, context.in_flight.load(Ordering::Relaxed)) {
1280 break; }
1282 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1283 continue;
1284 }
1285 Err(e) => {
1286 eprintln!("Queue error: {}", e);
1287 break;
1288 }
1289 };
1290
1291 let task_id = task.id.unwrap();
1292 context.in_flight.fetch_add(1, Ordering::Relaxed);
1293
1294 if let Err(e) = queue.mark_in_progress(task_id).await {
1296 eprintln!("Failed to mark task in progress: {}", e);
1297 continue;
1298 }
1299
1300 update_progress_for_task(&task, &context.progress);
1302
1303 if !context.force {
1305 let should_skip = match &task.task_type {
1306 SyncTaskType::DailyHealth { date } => context
1307 .parquet
1308 .has_daily_health(context.profile_id, *date)
1309 .unwrap_or(false),
1310 SyncTaskType::Performance { date } => context
1311 .parquet
1312 .has_performance_metrics(context.profile_id, *date)
1313 .unwrap_or(false),
1314 SyncTaskType::DownloadGpx {
1315 activity_id,
1316 activity_date,
1317 ..
1318 } => activity_date
1319 .as_ref()
1320 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1321 .and_then(|date| context.parquet.has_track_points(*activity_id, date).ok())
1322 .unwrap_or(false),
1323 _ => false,
1324 };
1325
1326 if should_skip {
1327 if let Err(e) = queue.mark_completed(task_id).await {
1328 eprintln!("Failed to mark task completed: {}", e);
1329 }
1330 complete_progress_for_task(&task, &context.progress);
1331 {
1332 let mut s = context.stats.lock().await;
1333 s.completed += 1;
1334 }
1335 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1336 continue;
1337 }
1338 }
1339
1340 let _permit = context.rate_limiter.acquire().await;
1342 context.progress.record_request();
1343
1344 let result = fetch_task_data(
1346 &task,
1347 &context.client,
1348 &context.token,
1349 &context.display_name,
1350 context.profile_id,
1351 )
1352 .await;
1353
1354 match result {
1355 Ok(data) => {
1356 context.rate_limiter.on_success();
1357 if tx.send(data).await.is_err() {
1359 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1361 let backoff = Duration::seconds(60);
1362 let _ = queue
1363 .mark_failed(task_id, "Consumer channel closed", backoff)
1364 .await;
1365 break;
1366 }
1367 }
1368 Err(GarminError::RateLimited) => {
1369 context.rate_limiter.on_rate_limit();
1370 let backoff = Duration::seconds(60);
1371 if let Err(e) = queue.mark_failed(task_id, "Rate limited", backoff).await {
1372 eprintln!("Failed to mark task as rate limited: {}", e);
1373 }
1374 fail_progress_for_task(&task, &context.progress, "Rate limited");
1375 {
1376 let mut s = context.stats.lock().await;
1377 s.rate_limited += 1;
1378 }
1379 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1380 }
1381 Err(e) => {
1382 let backoff = Duration::seconds(60);
1383 let error_msg = e.to_string();
1384 if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
1385 eprintln!("Failed to mark task as failed: {}", e);
1386 }
1387 fail_progress_for_task(&task, &context.progress, &error_msg);
1388 {
1389 let mut s = context.stats.lock().await;
1390 s.failed += 1;
1391 }
1392 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1393 }
1394 }
1395 }
1396}
1397
1398fn value_to_i32(value: &serde_json::Value) -> Option<i32> {
1399 if let Some(int) = value.as_i64() {
1400 return Some(int as i32);
1401 }
1402 value.as_f64().map(|float| float.round() as i32)
1403}
1404
1405fn first_entry(value: &serde_json::Value) -> Option<&serde_json::Value> {
1406 if let Some(array) = value.as_array() {
1407 array.first()
1408 } else {
1409 Some(value)
1410 }
1411}
1412
1413fn parse_sleep_metrics(value: Option<&serde_json::Value>) -> SleepMetrics {
1414 let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value);
1415
1416 let dto = match dto {
1417 Some(dto) => dto,
1418 None => return (None, None, None, None, None),
1419 };
1420
1421 let deep = dto.get("deepSleepSeconds").and_then(value_to_i32);
1422 let light = dto.get("lightSleepSeconds").and_then(value_to_i32);
1423 let rem = dto.get("remSleepSeconds").and_then(value_to_i32);
1424
1425 let total = dto
1426 .get("sleepTimeSeconds")
1427 .and_then(value_to_i32)
1428 .or_else(|| match (deep, light, rem) {
1429 (Some(d), Some(l), Some(r)) => Some(d + l + r),
1430 _ => None,
1431 });
1432
1433 let score = dto
1434 .get("sleepScores")
1435 .and_then(|v| v.get("overall"))
1436 .and_then(|v| v.get("value"))
1437 .and_then(value_to_i32);
1438
1439 (total, deep, light, rem, score)
1440}
1441
1442fn parse_hrv_metrics(
1443 value: Option<&serde_json::Value>,
1444) -> (Option<i32>, Option<i32>, Option<String>) {
1445 let summary = value.and_then(|v| v.get("hrvSummary")).or(value);
1446
1447 let summary = match summary {
1448 Some(summary) => summary,
1449 None => return (None, None, None),
1450 };
1451
1452 let weekly_avg = summary.get("weeklyAvg").and_then(value_to_i32);
1453 let last_night = summary
1454 .get("lastNight")
1455 .or_else(|| summary.get("lastNightAvg"))
1456 .and_then(value_to_i32);
1457 let status = summary
1458 .get("status")
1459 .and_then(|v| v.as_str())
1460 .map(|s| s.to_string());
1461
1462 (weekly_avg, last_night, status)
1463}
1464
1465fn parse_vo2max_value(value: Option<&serde_json::Value>) -> Option<f64> {
1466 let entry = value.and_then(first_entry)?;
1467 entry
1468 .get("generic")
1469 .and_then(|v| v.get("vo2MaxValue"))
1470 .and_then(|v| v.as_f64())
1471}
1472
1473fn parse_vo2max_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1474 let entry = value.and_then(first_entry)?;
1475 entry
1476 .get("generic")
1477 .and_then(|v| v.get("fitnessAge"))
1478 .and_then(value_to_i32)
1479}
1480
1481fn parse_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1482 value
1483 .and_then(|v| v.get("fitnessAge"))
1484 .and_then(value_to_i32)
1485}
1486
1487fn parse_race_predictions(
1488 value: Option<&serde_json::Value>,
1489) -> (Option<i32>, Option<i32>, Option<i32>, Option<i32>) {
1490 let entry = match value.and_then(first_entry) {
1491 Some(entry) => entry,
1492 None => return (None, None, None, None),
1493 };
1494
1495 let race_5k = entry.get("time5K").and_then(value_to_i32);
1496 let race_10k = entry.get("time10K").and_then(value_to_i32);
1497 let race_half = entry.get("timeHalfMarathon").and_then(value_to_i32);
1498 let race_marathon = entry.get("timeMarathon").and_then(value_to_i32);
1499
1500 (race_5k, race_10k, race_half, race_marathon)
1501}
1502
1503fn parse_overall_score(value: Option<&serde_json::Value>) -> Option<i32> {
1504 let entry = value.and_then(first_entry)?;
1505 entry.get("overallScore").and_then(value_to_i32)
1506}
1507
1508fn parse_training_status(value: Option<&serde_json::Value>, date: NaiveDate) -> Option<String> {
1509 let root = value?;
1510 let date_str = date.to_string();
1511
1512 if let Some(latest) = root
1513 .get("mostRecentTrainingStatus")
1514 .and_then(|v| v.get("latestTrainingStatusData"))
1515 .and_then(|v| v.as_object())
1516 {
1517 for entry in latest.values() {
1518 if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1519 return entry
1520 .get("trainingStatusFeedbackPhrase")
1521 .and_then(|v| v.as_str())
1522 .map(|s| s.to_string());
1523 }
1524 }
1525 }
1526
1527 if let Some(history) = root.get("trainingStatusHistory").and_then(|v| v.as_array()) {
1528 for entry in history {
1529 if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1530 return entry
1531 .get("trainingStatusFeedbackPhrase")
1532 .and_then(|v| v.as_str())
1533 .map(|s| s.to_string());
1534 }
1535 }
1536 }
1537
1538 None
1539}
1540
1541async fn fetch_task_data(
1543 task: &SyncTask,
1544 client: &GarminClient,
1545 token: &OAuth2Token,
1546 display_name: &str,
1547 profile_id: i32,
1548) -> Result<SyncData> {
1549 let task_id = task.id.unwrap();
1550
1551 match &task.task_type {
1552 SyncTaskType::Activities {
1553 start,
1554 limit,
1555 min_date,
1556 max_date,
1557 } => {
1558 let path = format!(
1559 "/activitylist-service/activities/search/activities?limit={}&start={}",
1560 limit, start
1561 );
1562 let activities: Vec<serde_json::Value> = client.get_json(token, &path).await?;
1563
1564 let mut records = Vec::new();
1565 let mut gpx_tasks = Vec::new();
1566 let mut reached_min = false;
1567
1568 for activity in &activities {
1569 let activity_date = activity
1570 .get("startTimeLocal")
1571 .and_then(|v| v.as_str())
1572 .and_then(|s| s.split(' ').next())
1573 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
1574
1575 if let Some(date) = activity_date {
1576 if let Some(min) = *min_date {
1577 if date < min {
1578 reached_min = true;
1579 break;
1580 }
1581 }
1582
1583 if let Some(max) = *max_date {
1584 if date > max {
1585 continue;
1586 }
1587 }
1588 }
1589
1590 let parsed = parse_activity(activity, profile_id)?;
1592 records.push(parsed);
1593
1594 if activity
1596 .get("hasPolyline")
1597 .and_then(|v| v.as_bool())
1598 .unwrap_or(false)
1599 {
1600 if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
1601 let activity_name = activity
1602 .get("activityName")
1603 .and_then(|v| v.as_str())
1604 .map(|s| s.to_string());
1605 let activity_date = activity
1606 .get("startTimeLocal")
1607 .and_then(|v| v.as_str())
1608 .and_then(|s| s.split(' ').next())
1609 .map(|s| s.to_string());
1610
1611 gpx_tasks.push(SyncTask::new(
1612 profile_id,
1613 task.pipeline,
1614 SyncTaskType::DownloadGpx {
1615 activity_id: id,
1616 activity_name,
1617 activity_date,
1618 },
1619 ));
1620 }
1621 }
1622 }
1623
1624 let next_page = if activities.len() == *limit as usize && !reached_min {
1626 Some(SyncTask::new(
1627 profile_id,
1628 task.pipeline,
1629 SyncTaskType::Activities {
1630 start: start + limit,
1631 limit: *limit,
1632 min_date: *min_date,
1633 max_date: *max_date,
1634 },
1635 ))
1636 } else {
1637 None
1638 };
1639
1640 Ok(SyncData::Activities {
1641 records,
1642 gpx_tasks,
1643 next_page,
1644 task_id,
1645 })
1646 }
1647
1648 SyncTaskType::DownloadGpx {
1649 activity_id,
1650 activity_date,
1651 ..
1652 } => {
1653 let path = format!("/download-service/export/gpx/activity/{}", activity_id);
1654 let gpx_bytes = client.download(token, &path).await?;
1655 let gpx_data = String::from_utf8_lossy(&gpx_bytes);
1656
1657 let date = activity_date
1659 .as_ref()
1660 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1661 .unwrap_or_else(|| Utc::now().date_naive());
1662
1663 let points = parse_gpx(*activity_id, &gpx_data)?;
1664
1665 Ok(SyncData::TrackPoints {
1666 activity_id: *activity_id,
1667 date,
1668 points,
1669 task_id,
1670 })
1671 }
1672
1673 SyncTaskType::DailyHealth { date } => {
1674 let path = format!(
1675 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
1676 display_name, date
1677 );
1678
1679 let health_result: std::result::Result<serde_json::Value, _> =
1681 client.get_json(token, &path).await;
1682
1683 let health = match health_result {
1684 Ok(data) => data,
1685 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
1686 serde_json::json!({})
1688 }
1689 Err(e) => return Err(e),
1690 };
1691
1692 let sleep_path = format!(
1693 "/wellness-service/wellness/dailySleepData/{}?date={}",
1694 display_name, date
1695 );
1696 let sleep_data: Option<serde_json::Value> =
1697 match client.get_json(token, &sleep_path).await {
1698 Ok(data) => Some(data),
1699 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1700 Err(e) => return Err(e),
1701 };
1702
1703 let hrv_path = format!("/hrv-service/hrv/{}", date);
1704 let hrv_data: Option<serde_json::Value> = match client.get_json(token, &hrv_path).await
1705 {
1706 Ok(data) => Some(data),
1707 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1708 Err(e) => return Err(e),
1709 };
1710
1711 let (sleep_total, deep_sleep, light_sleep, rem_sleep, sleep_score) =
1712 parse_sleep_metrics(sleep_data.as_ref());
1713 let (hrv_weekly_avg, hrv_last_night, hrv_status) = parse_hrv_metrics(hrv_data.as_ref());
1714
1715 let record = DailyHealth {
1716 id: None,
1717 profile_id,
1718 date: *date,
1719 steps: health
1720 .get("totalSteps")
1721 .and_then(|v| v.as_i64())
1722 .map(|v| v as i32),
1723 step_goal: health
1724 .get("dailyStepGoal")
1725 .and_then(|v| v.as_i64())
1726 .map(|v| v as i32),
1727 total_calories: health
1728 .get("totalKilocalories")
1729 .and_then(|v| v.as_i64())
1730 .map(|v| v as i32),
1731 active_calories: health
1732 .get("activeKilocalories")
1733 .and_then(|v| v.as_i64())
1734 .map(|v| v as i32),
1735 bmr_calories: health
1736 .get("bmrKilocalories")
1737 .and_then(|v| v.as_i64())
1738 .map(|v| v as i32),
1739 resting_hr: health
1740 .get("restingHeartRate")
1741 .and_then(|v| v.as_i64())
1742 .map(|v| v as i32),
1743 sleep_seconds: sleep_total.or_else(|| {
1744 health
1745 .get("sleepingSeconds")
1746 .and_then(|v| v.as_i64())
1747 .map(|v| v as i32)
1748 }),
1749 deep_sleep_seconds: deep_sleep,
1750 light_sleep_seconds: light_sleep,
1751 rem_sleep_seconds: rem_sleep,
1752 sleep_score,
1753 avg_stress: health
1754 .get("averageStressLevel")
1755 .and_then(|v| v.as_i64())
1756 .map(|v| v as i32),
1757 max_stress: health
1758 .get("maxStressLevel")
1759 .and_then(|v| v.as_i64())
1760 .map(|v| v as i32),
1761 body_battery_start: health
1762 .get("bodyBatteryChargedValue")
1763 .and_then(|v| v.as_i64())
1764 .map(|v| v as i32),
1765 body_battery_end: health
1766 .get("bodyBatteryDrainedValue")
1767 .and_then(|v| v.as_i64())
1768 .map(|v| v as i32),
1769 hrv_weekly_avg,
1770 hrv_last_night,
1771 hrv_status,
1772 avg_respiration: health
1773 .get("averageRespirationValue")
1774 .and_then(|v| v.as_f64()),
1775 avg_spo2: health
1776 .get("averageSpo2Value")
1777 .and_then(|v| v.as_i64())
1778 .map(|v| v as i32),
1779 lowest_spo2: health
1780 .get("lowestSpo2Value")
1781 .and_then(|v| v.as_i64())
1782 .map(|v| v as i32),
1783 hydration_ml: health
1784 .get("hydrationIntakeGoal")
1785 .and_then(|v| v.as_i64())
1786 .map(|v| v as i32),
1787 moderate_intensity_min: health
1788 .get("moderateIntensityMinutes")
1789 .and_then(|v| v.as_i64())
1790 .map(|v| v as i32),
1791 vigorous_intensity_min: health
1792 .get("vigorousIntensityMinutes")
1793 .and_then(|v| v.as_i64())
1794 .map(|v| v as i32),
1795 raw_json: Some(health),
1796 };
1797
1798 Ok(SyncData::Health { record, task_id })
1799 }
1800
1801 SyncTaskType::Performance { date } => {
1802 let vo2_path = format!("/metrics-service/metrics/maxmet/daily/{}/{}", date, date);
1803 let vo2max: Option<serde_json::Value> = match client.get_json(token, &vo2_path).await {
1804 Ok(data) => Some(data),
1805 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1806 Err(e) => return Err(e),
1807 };
1808
1809 let race_path = format!(
1810 "/metrics-service/metrics/racepredictions/daily/{}?fromCalendarDate={}&toCalendarDate={}",
1811 display_name, date, date
1812 );
1813 let race_predictions: Option<serde_json::Value> =
1814 match client.get_json(token, &race_path).await {
1815 Ok(data) => Some(data),
1816 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1817 Err(e) => return Err(e),
1818 };
1819
1820 let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
1822 let training_readiness: Option<serde_json::Value> =
1823 client.get_json(token, &readiness_path).await.ok();
1824
1825 let status_path = format!(
1827 "/metrics-service/metrics/trainingstatus/aggregated/{}",
1828 date
1829 );
1830 let training_status: Option<serde_json::Value> =
1831 client.get_json(token, &status_path).await.ok();
1832
1833 let endurance_path = format!(
1834 "/metrics-service/metrics/endurancescore?calendarDate={}",
1835 date
1836 );
1837 let endurance_score_data: Option<serde_json::Value> =
1838 match client.get_json(token, &endurance_path).await {
1839 Ok(data) => Some(data),
1840 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1841 Err(e) => return Err(e),
1842 };
1843
1844 let hill_path = format!("/metrics-service/metrics/hillscore?calendarDate={}", date);
1845 let hill_score_data: Option<serde_json::Value> =
1846 match client.get_json(token, &hill_path).await {
1847 Ok(data) => Some(data),
1848 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1849 Err(e) => return Err(e),
1850 };
1851
1852 let fitness_age_path = format!("/fitnessage-service/fitnessage/{}", date);
1853 let fitness_age_data: Option<serde_json::Value> =
1854 match client.get_json(token, &fitness_age_path).await {
1855 Ok(data) => Some(data),
1856 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1857 Err(e) => return Err(e),
1858 };
1859
1860 let vo2max_value = parse_vo2max_value(vo2max.as_ref());
1861 let fitness_age = parse_fitness_age(fitness_age_data.as_ref())
1862 .or_else(|| parse_vo2max_fitness_age(vo2max.as_ref()));
1863
1864 let readiness_entry = training_readiness
1865 .as_ref()
1866 .and_then(|v| v.as_array())
1867 .and_then(|arr| arr.first());
1868
1869 let readiness_score = readiness_entry
1870 .and_then(|e| e.get("score"))
1871 .and_then(|v| v.as_i64())
1872 .map(|v| v as i32);
1873
1874 let training_status_str = parse_training_status(training_status.as_ref(), *date);
1875 let (race_5k, race_10k, race_half, race_marathon) =
1876 parse_race_predictions(race_predictions.as_ref());
1877 let endurance_score = parse_overall_score(endurance_score_data.as_ref());
1878 let hill_score = parse_overall_score(hill_score_data.as_ref());
1879
1880 let record = PerformanceMetrics {
1881 id: None,
1882 profile_id,
1883 date: *date,
1884 vo2max: vo2max_value,
1885 fitness_age,
1886 training_readiness: readiness_score,
1887 training_status: training_status_str,
1888 lactate_threshold_hr: None,
1889 lactate_threshold_pace: None,
1890 race_5k_sec: race_5k,
1891 race_10k_sec: race_10k,
1892 race_half_sec: race_half,
1893 race_marathon_sec: race_marathon,
1894 endurance_score,
1895 hill_score,
1896 raw_json: None,
1897 };
1898
1899 Ok(SyncData::Performance { record, task_id })
1900 }
1901
1902 _ => {
1903 Err(GarminError::invalid_response(
1905 "Unsupported task type for parallel sync",
1906 ))
1907 }
1908 }
1909}
1910
1911fn parse_activity(activity: &serde_json::Value, profile_id: i32) -> Result<Activity> {
1913 let activity_id = activity
1914 .get("activityId")
1915 .and_then(|v| v.as_i64())
1916 .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
1917
1918 let start_time_local = activity
1919 .get("startTimeLocal")
1920 .and_then(|v| v.as_str())
1921 .and_then(parse_garmin_datetime);
1922
1923 let start_time_gmt = activity
1924 .get("startTimeGMT")
1925 .and_then(|v| v.as_str())
1926 .and_then(parse_garmin_datetime);
1927
1928 Ok(Activity {
1929 activity_id,
1930 profile_id,
1931 activity_name: activity
1932 .get("activityName")
1933 .and_then(|v| v.as_str())
1934 .map(|s| s.to_string()),
1935 activity_type: activity
1936 .get("activityType")
1937 .and_then(|v| v.get("typeKey"))
1938 .and_then(|v| v.as_str())
1939 .map(|s| s.to_string()),
1940 start_time_local,
1941 start_time_gmt,
1942 duration_sec: activity.get("duration").and_then(|v| v.as_f64()),
1943 distance_m: activity.get("distance").and_then(|v| v.as_f64()),
1944 calories: activity
1945 .get("calories")
1946 .and_then(|v| v.as_i64())
1947 .map(|v| v as i32),
1948 avg_hr: activity
1949 .get("averageHR")
1950 .and_then(|v| v.as_i64())
1951 .map(|v| v as i32),
1952 max_hr: activity
1953 .get("maxHR")
1954 .and_then(|v| v.as_i64())
1955 .map(|v| v as i32),
1956 avg_speed: activity.get("averageSpeed").and_then(|v| v.as_f64()),
1957 max_speed: activity.get("maxSpeed").and_then(|v| v.as_f64()),
1958 elevation_gain: activity.get("elevationGain").and_then(|v| v.as_f64()),
1959 elevation_loss: activity.get("elevationLoss").and_then(|v| v.as_f64()),
1960 avg_cadence: activity
1961 .get("averageRunningCadenceInStepsPerMinute")
1962 .and_then(|v| v.as_f64()),
1963 avg_power: activity
1964 .get("avgPower")
1965 .and_then(|v| v.as_i64())
1966 .map(|v| v as i32),
1967 normalized_power: activity
1968 .get("normPower")
1969 .and_then(|v| v.as_i64())
1970 .map(|v| v as i32),
1971 training_effect: activity
1972 .get("aerobicTrainingEffect")
1973 .and_then(|v| v.as_f64()),
1974 training_load: activity
1975 .get("activityTrainingLoad")
1976 .and_then(|v| v.as_f64()),
1977 start_lat: activity.get("startLatitude").and_then(|v| v.as_f64()),
1978 start_lon: activity.get("startLongitude").and_then(|v| v.as_f64()),
1979 end_lat: activity.get("endLatitude").and_then(|v| v.as_f64()),
1980 end_lon: activity.get("endLongitude").and_then(|v| v.as_f64()),
1981 ground_contact_time: activity
1982 .get("avgGroundContactTime")
1983 .and_then(|v| v.as_f64()),
1984 vertical_oscillation: activity
1985 .get("avgVerticalOscillation")
1986 .and_then(|v| v.as_f64()),
1987 stride_length: activity.get("avgStrideLength").and_then(|v| v.as_f64()),
1988 location_name: activity
1989 .get("locationName")
1990 .and_then(|v| v.as_str())
1991 .map(|s| s.to_string()),
1992 raw_json: Some(activity.clone()),
1993 })
1994}
1995
1996fn parse_garmin_datetime(value: &str) -> Option<DateTime<Utc>> {
1997 if let Ok(dt) = DateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
1998 return Some(dt.with_timezone(&Utc));
1999 }
2000
2001 let naive = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
2002 .or_else(|_| NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f"))
2003 .ok()?;
2004
2005 Some(DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2006}
2007
2008fn parse_gpx(activity_id: i64, gpx_data: &str) -> Result<Vec<TrackPoint>> {
2010 use gpx::read;
2011 use std::io::BufReader;
2012
2013 let reader = BufReader::new(gpx_data.as_bytes());
2014 let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
2015
2016 let mut points = Vec::new();
2017
2018 for track in gpx.tracks {
2019 for segment in track.segments {
2020 for point in segment.points {
2021 let timestamp = point
2022 .time
2023 .map(|t| {
2024 DateTime::parse_from_rfc3339(&t.format().unwrap_or_default())
2025 .map(|dt| dt.with_timezone(&Utc))
2026 .unwrap_or_default()
2027 })
2028 .unwrap_or_default();
2029
2030 points.push(TrackPoint {
2031 id: None,
2032 activity_id,
2033 timestamp,
2034 lat: Some(point.point().y()),
2035 lon: Some(point.point().x()),
2036 elevation: point.elevation,
2037 heart_rate: None,
2038 cadence: None,
2039 power: None,
2040 speed: None,
2041 });
2042 }
2043 }
2044 }
2045
2046 Ok(points)
2047}
2048
2049async fn consumer_loop(
2051 _id: usize,
2052 rx: Arc<TokioMutex<mpsc::Receiver<SyncData>>>,
2053 parquet: Arc<ParquetStore>,
2054 queue: SharedTaskQueue,
2055 stats: Arc<TokioMutex<SyncStats>>,
2056 progress: SharedProgress,
2057 in_flight: Arc<AtomicUsize>,
2058) {
2059 loop {
2060 let data = {
2062 let mut rx = rx.lock().await;
2063 rx.recv().await
2064 };
2065
2066 let data = match data {
2067 Some(d) => d,
2068 None => break, };
2070
2071 let result = match &data {
2073 SyncData::Activities {
2074 records,
2075 gpx_tasks,
2076 next_page,
2077 task_id,
2078 } => {
2079 let write_result = parquet.upsert_activities_async(records).await;
2081
2082 if write_result.is_ok() {
2083 let mut gpx_added = 0u32;
2085 for gpx_task in gpx_tasks {
2086 let should_skip = match &gpx_task.task_type {
2087 SyncTaskType::DownloadGpx {
2088 activity_id,
2089 activity_date,
2090 ..
2091 } => activity_date
2092 .as_ref()
2093 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
2094 .and_then(|date| parquet.has_track_points(*activity_id, date).ok())
2095 .unwrap_or(false),
2096 _ => false,
2097 };
2098
2099 if should_skip {
2100 continue;
2101 }
2102
2103 if let Err(e) = queue.push(gpx_task.clone()).await {
2104 eprintln!("Failed to queue GPX task: {}", e);
2105 } else {
2106 gpx_added += 1;
2107 }
2108 }
2109 if gpx_added > 0 {
2110 progress.gpx.add_total(gpx_added);
2111 }
2112
2113 if let Some(next) = next_page {
2115 if let Err(e) = queue.push(next.clone()).await {
2116 eprintln!("Failed to queue next page: {}", e);
2117 }
2118 progress.activities.add_total(1);
2119 }
2120 }
2121
2122 (write_result, *task_id, "Activities")
2123 }
2124
2125 SyncData::Health { record, task_id } => {
2126 let result = parquet
2127 .upsert_daily_health_async(std::slice::from_ref(record))
2128 .await;
2129 (result, *task_id, "Health")
2130 }
2131
2132 SyncData::Performance { record, task_id } => {
2133 let result = parquet
2134 .upsert_performance_metrics_async(std::slice::from_ref(record))
2135 .await;
2136 (result, *task_id, "Performance")
2137 }
2138
2139 SyncData::TrackPoints {
2140 date,
2141 points,
2142 task_id,
2143 ..
2144 } => {
2145 let result = parquet.write_track_points_async(*date, points).await;
2146 (result, *task_id, "GPX")
2147 }
2148 };
2149
2150 let (write_result, task_id, task_type) = result;
2151
2152 match write_result {
2153 Ok(()) => {
2154 if let Err(e) = queue.mark_completed(task_id).await {
2156 eprintln!("Failed to mark task completed: {}", e);
2157 }
2158
2159 {
2161 let mut s = stats.lock().await;
2162 s.completed += 1;
2163 }
2164
2165 match task_type {
2167 "Activities" => {
2168 progress.activities.complete_one();
2169 progress.activities.clear_current_item();
2170 }
2171 "Health" => {
2172 progress.health.complete_one();
2173 progress.health.clear_current_item();
2174 }
2175 "Performance" => {
2176 progress.performance.complete_one();
2177 progress.performance.clear_current_item();
2178 }
2179 "GPX" => {
2180 progress.gpx.complete_one();
2181 progress.gpx.clear_current_item();
2182 }
2183 _ => {}
2184 }
2185 in_flight.fetch_sub(1, Ordering::Relaxed);
2186 }
2187 Err(e) => {
2188 let backoff = Duration::seconds(60);
2190 let error_msg = e.to_string();
2191 if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
2192 eprintln!("Failed to mark task as failed: {}", e);
2193 }
2194
2195 {
2197 let mut s = stats.lock().await;
2198 s.failed += 1;
2199 }
2200
2201 record_write_failure(&data, &progress, &error_msg);
2202 eprintln!("Write error for {}: {}", task_type, error_msg);
2203 in_flight.fetch_sub(1, Ordering::Relaxed);
2204 }
2205 }
2206 }
2207}
2208
2209#[cfg(test)]
2210mod tests {
2211 use super::*;
2212 use crate::db::models::TaskStatus;
2213 use chrono::NaiveDate;
2214 use serde_json::json;
2215 use wiremock::matchers::{method, path, query_param};
2216 use wiremock::{Mock, MockServer, ResponseTemplate};
2217
2218 #[test]
2219 fn test_should_exit_when_idle_requires_no_inflight() {
2220 assert!(!should_exit_when_idle(MAX_IDLE_RETRIES, 1));
2221 assert!(should_exit_when_idle(MAX_IDLE_RETRIES, 0));
2222 assert!(!should_exit_when_idle(MAX_IDLE_RETRIES - 1, 0));
2223 }
2224
2225 #[test]
2226 fn test_record_write_failure_updates_progress() {
2227 let progress = SyncProgress::new();
2228 let record = DailyHealth {
2229 id: None,
2230 profile_id: 1,
2231 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
2232 steps: None,
2233 step_goal: None,
2234 total_calories: None,
2235 active_calories: None,
2236 bmr_calories: None,
2237 resting_hr: None,
2238 sleep_seconds: None,
2239 deep_sleep_seconds: None,
2240 light_sleep_seconds: None,
2241 rem_sleep_seconds: None,
2242 sleep_score: None,
2243 avg_stress: None,
2244 max_stress: None,
2245 body_battery_start: None,
2246 body_battery_end: None,
2247 hrv_weekly_avg: None,
2248 hrv_last_night: None,
2249 hrv_status: None,
2250 avg_respiration: None,
2251 avg_spo2: None,
2252 lowest_spo2: None,
2253 hydration_ml: None,
2254 moderate_intensity_min: None,
2255 vigorous_intensity_min: None,
2256 raw_json: None,
2257 };
2258
2259 let data = SyncData::Health { record, task_id: 1 };
2260 record_write_failure(&data, &progress, "write failed");
2261
2262 assert_eq!(progress.health.get_failed(), 1);
2263 let errors = progress.get_errors();
2264 assert_eq!(errors.len(), 1);
2265 assert_eq!(errors[0].stream, "Health");
2266 }
2267
2268 fn test_token() -> OAuth2Token {
2269 OAuth2Token {
2270 scope: "test".to_string(),
2271 jti: "jti".to_string(),
2272 token_type: "Bearer".to_string(),
2273 access_token: "access".to_string(),
2274 refresh_token: "refresh".to_string(),
2275 expires_in: 3600,
2276 expires_at: Utc::now().timestamp() + 3600,
2277 refresh_token_expires_in: 86400,
2278 refresh_token_expires_at: Utc::now().timestamp() + 86400,
2279 }
2280 }
2281
2282 #[tokio::test]
2283 async fn test_activity_pagination_respects_date_bounds() {
2284 let server = MockServer::start().await;
2285
2286 let body = serde_json::json!([
2287 {
2288 "activityId": 1,
2289 "activityName": "Newest",
2290 "startTimeLocal": "2025-01-05 08:00:00",
2291 "startTimeGMT": "2025-01-05 07:00:00",
2292 "activityType": { "typeKey": "running" },
2293 "hasPolyline": false
2294 },
2295 {
2296 "activityId": 2,
2297 "activityName": "Mid",
2298 "startTimeLocal": "2025-01-04 08:00:00",
2299 "startTimeGMT": "2025-01-04 07:00:00",
2300 "activityType": { "typeKey": "running" },
2301 "hasPolyline": false
2302 },
2303 {
2304 "activityId": 3,
2305 "activityName": "Old",
2306 "startTimeLocal": "2025-01-03 08:00:00",
2307 "startTimeGMT": "2025-01-03 07:00:00",
2308 "activityType": { "typeKey": "running" },
2309 "hasPolyline": false
2310 }
2311 ]);
2312
2313 Mock::given(method("GET"))
2314 .and(path("/activitylist-service/activities/search/activities"))
2315 .and(query_param("limit", "50"))
2316 .and(query_param("start", "0"))
2317 .respond_with(ResponseTemplate::new(200).set_body_json(body))
2318 .mount(&server)
2319 .await;
2320
2321 let client = GarminClient::new_with_base_url(&server.uri());
2322 let task = SyncTask {
2323 id: Some(1),
2324 profile_id: 1,
2325 task_type: SyncTaskType::Activities {
2326 start: 0,
2327 limit: 50,
2328 min_date: Some(NaiveDate::from_ymd_opt(2025, 1, 4).unwrap()),
2329 max_date: Some(NaiveDate::from_ymd_opt(2025, 1, 5).unwrap()),
2330 },
2331 pipeline: SyncPipeline::Frontier,
2332 status: TaskStatus::Pending,
2333 attempts: 0,
2334 last_error: None,
2335 created_at: None,
2336 next_retry_at: None,
2337 completed_at: None,
2338 };
2339
2340 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2341 .await
2342 .unwrap();
2343
2344 match data {
2345 SyncData::Activities {
2346 records,
2347 gpx_tasks,
2348 next_page,
2349 ..
2350 } => {
2351 assert_eq!(records.len(), 2);
2352 assert!(gpx_tasks.is_empty());
2353 assert!(next_page.is_none());
2354 }
2355 _ => panic!("unexpected data type"),
2356 }
2357 }
2358
2359 #[tokio::test]
2360 async fn test_daily_health_includes_sleep_and_hrv() {
2361 let server = MockServer::start().await;
2362 let date = NaiveDate::from_ymd_opt(2025, 12, 4).unwrap();
2363
2364 let health_body = json!({
2365 "totalSteps": 1234,
2366 "sleepingSeconds": 1000,
2367 "averageStressLevel": 20
2368 });
2369
2370 Mock::given(method("GET"))
2371 .and(path("/usersummary-service/usersummary/daily/TestUser"))
2372 .and(query_param("calendarDate", "2025-12-04"))
2373 .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2374 .mount(&server)
2375 .await;
2376
2377 let sleep_fixture: serde_json::Value =
2378 serde_json::from_str(include_str!("../../tests/fixtures/sleep_2025-12-04.json"))
2379 .unwrap();
2380
2381 Mock::given(method("GET"))
2382 .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2383 .and(query_param("date", "2025-12-04"))
2384 .respond_with(ResponseTemplate::new(200).set_body_json(sleep_fixture))
2385 .mount(&server)
2386 .await;
2387
2388 let hrv_fixture: serde_json::Value =
2389 serde_json::from_str(include_str!("../../tests/fixtures/hrv.json")).unwrap();
2390
2391 Mock::given(method("GET"))
2392 .and(path("/hrv-service/hrv/2025-12-04"))
2393 .respond_with(ResponseTemplate::new(200).set_body_json(hrv_fixture))
2394 .mount(&server)
2395 .await;
2396
2397 let client = GarminClient::new_with_base_url(&server.uri());
2398 let mut task = SyncTask::new(
2399 1,
2400 SyncPipeline::Frontier,
2401 SyncTaskType::DailyHealth { date },
2402 );
2403 task.id = Some(1);
2404
2405 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2406 .await
2407 .unwrap();
2408
2409 match data {
2410 SyncData::Health { record, .. } => {
2411 assert_eq!(record.sleep_seconds, Some(31920));
2412 assert_eq!(record.deep_sleep_seconds, Some(8100));
2413 assert_eq!(record.light_sleep_seconds, Some(15300));
2414 assert_eq!(record.rem_sleep_seconds, Some(8520));
2415 assert_eq!(record.sleep_score, Some(88));
2416 assert_eq!(record.hrv_weekly_avg, Some(65));
2417 assert_eq!(record.hrv_last_night, Some(68));
2418 assert_eq!(record.hrv_status.as_deref(), Some("BALANCED"));
2419 }
2420 _ => panic!("unexpected data type"),
2421 }
2422 }
2423
2424 #[tokio::test]
2425 async fn test_daily_health_handles_missing_sleep_and_hrv() {
2426 let server = MockServer::start().await;
2427 let date = NaiveDate::from_ymd_opt(2025, 12, 5).unwrap();
2428
2429 let health_body = json!({
2430 "totalSteps": 4321,
2431 "sleepingSeconds": 7200
2432 });
2433
2434 Mock::given(method("GET"))
2435 .and(path("/usersummary-service/usersummary/daily/TestUser"))
2436 .and(query_param("calendarDate", "2025-12-05"))
2437 .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2438 .mount(&server)
2439 .await;
2440
2441 Mock::given(method("GET"))
2442 .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2443 .and(query_param("date", "2025-12-05"))
2444 .respond_with(ResponseTemplate::new(404))
2445 .mount(&server)
2446 .await;
2447
2448 Mock::given(method("GET"))
2449 .and(path("/hrv-service/hrv/2025-12-05"))
2450 .respond_with(ResponseTemplate::new(404))
2451 .mount(&server)
2452 .await;
2453
2454 let client = GarminClient::new_with_base_url(&server.uri());
2455 let mut task = SyncTask::new(
2456 1,
2457 SyncPipeline::Frontier,
2458 SyncTaskType::DailyHealth { date },
2459 );
2460 task.id = Some(1);
2461
2462 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2463 .await
2464 .unwrap();
2465
2466 match data {
2467 SyncData::Health { record, .. } => {
2468 assert_eq!(record.sleep_seconds, Some(7200));
2469 assert_eq!(record.deep_sleep_seconds, None);
2470 assert_eq!(record.hrv_weekly_avg, None);
2471 assert_eq!(record.hrv_status, None);
2472 }
2473 _ => panic!("unexpected data type"),
2474 }
2475 }
2476
2477 #[test]
2478 fn test_parse_hrv_metrics_reads_last_night_avg() {
2479 let payload = json!({
2480 "hrvSummary": {
2481 "weeklyAvg": 60,
2482 "lastNightAvg": 58,
2483 "status": "BALANCED"
2484 }
2485 });
2486
2487 let (weekly_avg, last_night, status) = parse_hrv_metrics(Some(&payload));
2488 assert_eq!(weekly_avg, Some(60));
2489 assert_eq!(last_night, Some(58));
2490 assert_eq!(status.as_deref(), Some("BALANCED"));
2491 }
2492
2493 #[tokio::test]
2494 async fn test_performance_uses_date_scoped_endpoints() {
2495 let server = MockServer::start().await;
2496 let date = NaiveDate::from_ymd_opt(2025, 12, 10).unwrap();
2497
2498 let vo2_fixture: serde_json::Value =
2499 serde_json::from_str(include_str!("../../tests/fixtures/vo2max.json")).unwrap();
2500 Mock::given(method("GET"))
2501 .and(path(
2502 "/metrics-service/metrics/maxmet/daily/2025-12-10/2025-12-10",
2503 ))
2504 .respond_with(ResponseTemplate::new(200).set_body_json(vo2_fixture))
2505 .mount(&server)
2506 .await;
2507
2508 let race_fixture: serde_json::Value =
2509 serde_json::from_str(include_str!("../../tests/fixtures/race_predictions.json"))
2510 .unwrap();
2511 Mock::given(method("GET"))
2512 .and(path(
2513 "/metrics-service/metrics/racepredictions/daily/TestUser",
2514 ))
2515 .and(query_param("fromCalendarDate", "2025-12-10"))
2516 .and(query_param("toCalendarDate", "2025-12-10"))
2517 .respond_with(ResponseTemplate::new(200).set_body_json(race_fixture))
2518 .mount(&server)
2519 .await;
2520
2521 let readiness_fixture: serde_json::Value =
2522 serde_json::from_str(include_str!("../../tests/fixtures/training_readiness.json"))
2523 .unwrap();
2524 Mock::given(method("GET"))
2525 .and(path(
2526 "/metrics-service/metrics/trainingreadiness/2025-12-10",
2527 ))
2528 .respond_with(ResponseTemplate::new(200).set_body_json(readiness_fixture))
2529 .mount(&server)
2530 .await;
2531
2532 let training_status_fixture = json!({
2533 "mostRecentTrainingStatus": {
2534 "latestTrainingStatusData": {
2535 "123": {
2536 "calendarDate": "2025-12-10",
2537 "trainingStatusFeedbackPhrase": "PRODUCTIVE"
2538 }
2539 }
2540 }
2541 });
2542 Mock::given(method("GET"))
2543 .and(path(
2544 "/metrics-service/metrics/trainingstatus/aggregated/2025-12-10",
2545 ))
2546 .respond_with(ResponseTemplate::new(200).set_body_json(training_status_fixture))
2547 .mount(&server)
2548 .await;
2549
2550 let endurance_fixture: serde_json::Value =
2551 serde_json::from_str(include_str!("../../tests/fixtures/endurance_score.json"))
2552 .unwrap();
2553 Mock::given(method("GET"))
2554 .and(path("/metrics-service/metrics/endurancescore"))
2555 .and(query_param("calendarDate", "2025-12-10"))
2556 .respond_with(ResponseTemplate::new(200).set_body_json(endurance_fixture))
2557 .mount(&server)
2558 .await;
2559
2560 let hill_fixture: serde_json::Value =
2561 serde_json::from_str(include_str!("../../tests/fixtures/hill_score.json")).unwrap();
2562 Mock::given(method("GET"))
2563 .and(path("/metrics-service/metrics/hillscore"))
2564 .and(query_param("calendarDate", "2025-12-10"))
2565 .respond_with(ResponseTemplate::new(200).set_body_json(hill_fixture))
2566 .mount(&server)
2567 .await;
2568
2569 let fitness_age_fixture = json!({
2570 "calendarDate": "2025-12-10",
2571 "fitnessAge": 37.0
2572 });
2573 Mock::given(method("GET"))
2574 .and(path("/fitnessage-service/fitnessage/2025-12-10"))
2575 .respond_with(ResponseTemplate::new(200).set_body_json(fitness_age_fixture))
2576 .mount(&server)
2577 .await;
2578
2579 let client = GarminClient::new_with_base_url(&server.uri());
2580 let mut task = SyncTask::new(
2581 1,
2582 SyncPipeline::Backfill,
2583 SyncTaskType::Performance { date },
2584 );
2585 task.id = Some(1);
2586
2587 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2588 .await
2589 .unwrap();
2590
2591 match data {
2592 SyncData::Performance { record, .. } => {
2593 assert_eq!(record.vo2max, Some(53.0));
2594 assert_eq!(record.fitness_age, Some(37));
2595 assert_eq!(record.training_readiness, Some(69));
2596 assert_eq!(record.training_status.as_deref(), Some("PRODUCTIVE"));
2597 assert_eq!(record.race_5k_sec, Some(1245));
2598 assert_eq!(record.race_10k_sec, Some(2610));
2599 assert_eq!(record.race_half_sec, Some(5850));
2600 assert_eq!(record.race_marathon_sec, Some(12420));
2601 assert_eq!(record.endurance_score, Some(72));
2602 assert_eq!(record.hill_score, Some(58));
2603 }
2604 _ => panic!("unexpected data type"),
2605 }
2606 }
2607
2608 #[test]
2609 fn test_parse_garmin_datetime_accepts_naive_strings() {
2610 let dt = parse_garmin_datetime("2025-01-03 07:00:00");
2611 assert!(dt.is_some());
2612
2613 let dt = parse_garmin_datetime("2025-01-03 07:00:00.123");
2614 assert!(dt.is_some());
2615 }
2616}