1pub mod progress;
13pub mod rate_limiter;
14pub mod task_queue;
15
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, Utc};
20use tokio::sync::{mpsc, Mutex as TokioMutex};
21
22use crate::client::{GarminClient, OAuth2Token};
23use crate::db::models::{
24 Activity, DailyHealth, PerformanceMetrics, SyncPipeline, SyncTask, SyncTaskType, TrackPoint,
25};
26use crate::storage::{ParquetStore, Storage, SyncDb};
27use crate::{GarminError, Result};
28use std::io::{self, Write};
29
30pub use progress::{PlanningStep, SharedProgress, SyncProgress};
31pub use rate_limiter::{RateLimiter, SharedRateLimiter};
32pub use task_queue::{SharedTaskQueue, TaskQueue};
33
34fn pipeline_filter(mode: progress::SyncMode) -> Option<SyncPipeline> {
35 match mode {
36 progress::SyncMode::Latest => Some(SyncPipeline::Frontier),
37 progress::SyncMode::Backfill => Some(SyncPipeline::Backfill),
38 }
39}
40
41#[derive(Debug)]
43enum SyncData {
44 Activities {
46 records: Vec<Activity>,
47 gpx_tasks: Vec<SyncTask>,
48 next_page: Option<SyncTask>,
49 task_id: i64,
50 },
51 Health { record: DailyHealth, task_id: i64 },
53 Performance {
55 record: PerformanceMetrics,
56 task_id: i64,
57 },
58 TrackPoints {
60 #[allow(dead_code)]
61 activity_id: i64,
62 date: NaiveDate,
63 points: Vec<TrackPoint>,
64 task_id: i64,
65 },
66}
67
68pub struct SyncEngine {
70 storage: Storage,
71 client: GarminClient,
72 token: OAuth2Token,
73 rate_limiter: RateLimiter,
74 queue: TaskQueue,
75 profile_id: i32,
76 display_name: Option<String>,
77}
78
79#[derive(Clone)]
80struct ProducerContext {
81 rate_limiter: SharedRateLimiter,
82 client: GarminClient,
83 token: OAuth2Token,
84 progress: SharedProgress,
85 display_name: Arc<String>,
86 profile_id: i32,
87 stats: Arc<TokioMutex<SyncStats>>,
88 in_flight: Arc<AtomicUsize>,
89 parquet: Arc<ParquetStore>,
90 force: bool,
91 pipeline_filter: Option<SyncPipeline>,
92}
93
94type SleepMetrics = (
95 Option<i32>,
96 Option<i32>,
97 Option<i32>,
98 Option<i32>,
99 Option<i32>,
100);
101
102#[derive(Debug, Clone, Copy, Default)]
103struct TrackPointStreams {
104 heart_rate: Option<i32>,
105 cadence: Option<i32>,
106 power: Option<i32>,
107 speed: Option<f64>,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111enum TrackPointStreamField {
112 HeartRate,
113 Cadence,
114 Power,
115 Speed,
116}
117
118impl SyncEngine {
119 pub fn new(client: GarminClient, token: OAuth2Token) -> Result<Self> {
121 let storage = Storage::open_default()?;
122 Self::with_storage(storage, client, token)
123 }
124
125 pub fn with_storage(
127 storage: Storage,
128 client: GarminClient,
129 token: OAuth2Token,
130 ) -> Result<Self> {
131 let profile_id = storage.sync_db.get_or_create_profile("default")?;
133
134 let sync_db = SyncDb::open(storage.base_path().join("sync.db"))?;
136 let queue = TaskQueue::new(sync_db, profile_id, None);
137
138 Ok(Self {
139 storage,
140 client,
141 token,
142 rate_limiter: RateLimiter::new(),
143 queue,
144 profile_id,
145 display_name: None,
146 })
147 }
148
149 async fn get_display_name(&mut self) -> Result<String> {
151 if let Some(ref name) = self.display_name {
152 return Ok(name.clone());
153 }
154
155 let profile: serde_json::Value = self
156 .client
157 .get_json(&self.token, "/userprofile-service/socialProfile")
158 .await?;
159
160 let name = profile
161 .get("displayName")
162 .and_then(|v| v.as_str())
163 .map(|s| s.to_string())
164 .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
165
166 self.profile_id = self.storage.sync_db.get_or_create_profile(&name)?;
168 self.queue.set_profile_id(self.profile_id);
169
170 self.display_name = Some(name.clone());
171 Ok(name)
172 }
173
174 pub(crate) async fn find_oldest_activity_date(
177 &mut self,
178 progress: Option<&SyncProgress>,
179 ) -> Result<(NaiveDate, u32, u32)> {
180 if let Some(p) = progress {
181 p.set_planning_step(PlanningStep::FindingOldestActivity);
182 } else {
183 print!("Finding oldest activity date...");
184 let _ = io::stdout().flush();
185 }
186
187 self.rate_limiter.wait().await;
191
192 let limit: u32 = 100;
194 let mut jump: u32 = 100;
195 let mut last_non_empty: u32 = 0;
196 let max_jump: u32 = 1_000_000;
197
198 while jump < max_jump {
200 let path = format!(
201 "/activitylist-service/activities/search/activities?limit=1&start={}",
202 jump
203 );
204
205 let activities: Vec<serde_json::Value> =
206 self.client.get_json(&self.token, &path).await?;
207
208 if activities.is_empty() {
209 break;
210 }
211
212 last_non_empty = jump;
213 jump = jump.saturating_mul(2);
214 self.rate_limiter.wait().await;
215 }
216
217 let mut low = last_non_empty;
219 let mut high = jump;
220
221 while high - low > limit {
222 let mid = (low + high) / 2;
223 let path = format!(
224 "/activitylist-service/activities/search/activities?limit=1&start={}",
225 mid
226 );
227
228 self.rate_limiter.wait().await;
229 let activities: Vec<serde_json::Value> =
230 self.client.get_json(&self.token, &path).await?;
231
232 if activities.is_empty() {
233 high = mid;
234 } else {
235 low = mid;
236 }
237 }
238
239 let path = format!(
241 "/activitylist-service/activities/search/activities?limit={}&start={}",
242 limit, low
243 );
244
245 self.rate_limiter.wait().await;
246 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
247
248 let total_activities = low + activities.len() as u32;
250
251 let oldest_date = activities
252 .last()
253 .and_then(|activity| activity.get("startTimeLocal"))
254 .and_then(|v| v.as_str())
255 .and_then(|date_str| date_str.split(' ').next())
256 .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
257
258 let result = oldest_date.unwrap_or_else(|| {
259 Utc::now().date_naive() - Duration::days(365)
261 });
262
263 let estimated_gps = (total_activities as f32 * 0.8) as u32;
266
267 if let Some(p) = progress {
268 p.set_oldest_activity_date(&result.to_string());
269 } else {
270 println!(" {} ({} activities)", result, total_activities);
271 }
272 Ok((result, total_activities, estimated_gps))
273 }
274
275 async fn has_health_data(&mut self, display_name: &str, date: NaiveDate) -> Result<bool> {
276 let path = format!(
277 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
278 display_name, date
279 );
280
281 self.rate_limiter.wait().await;
282 let result: std::result::Result<serde_json::Value, _> =
283 self.client.get_json(&self.token, &path).await;
284
285 match result {
286 Ok(data) => Ok(!data.as_object().map(|o| o.is_empty()).unwrap_or(true)),
287 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
288 Err(e) => Err(e),
289 }
290 }
291
292 async fn find_first_health_date(
293 &mut self,
294 progress: Option<&SyncProgress>,
295 from: NaiveDate,
296 to: NaiveDate,
297 ) -> Result<Option<NaiveDate>> {
298 if let Some(p) = progress {
299 p.set_planning_step(PlanningStep::FindingFirstHealth);
300 }
301
302 if from > to {
303 return Ok(None);
304 }
305
306 let display_name = self.get_display_name().await?;
307 let mut low = from;
308 let mut high = to;
309 let mut found = None;
310
311 while low <= high {
312 let span = (high - low).num_days();
313 let mid = low + Duration::days(span / 2);
314
315 if self.has_health_data(&display_name, mid).await? {
316 found = Some(mid);
317 if mid == low {
318 break;
319 }
320 high = mid - Duration::days(1);
321 } else {
322 low = mid + Duration::days(1);
323 }
324 }
325
326 Ok(found)
327 }
328
329 async fn has_performance_data(&mut self, date: NaiveDate) -> Result<bool> {
330 let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
331 self.rate_limiter.wait().await;
332 let readiness: std::result::Result<serde_json::Value, _> =
333 self.client.get_json(&self.token, &readiness_path).await;
334
335 if let Ok(data) = readiness {
336 if data.as_array().and_then(|arr| arr.first()).is_some() {
337 return Ok(true);
338 }
339 }
340
341 let status_path = format!(
342 "/metrics-service/metrics/trainingstatus/aggregated/{}",
343 date
344 );
345 self.rate_limiter.wait().await;
346 let status: std::result::Result<serde_json::Value, _> =
347 self.client.get_json(&self.token, &status_path).await;
348
349 match status {
350 Ok(data) => Ok(data.get("mostRecentTrainingStatus").is_some()),
351 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => Ok(false),
352 Err(e) => Err(e),
353 }
354 }
355
356 async fn find_first_performance_date(
357 &mut self,
358 progress: Option<&SyncProgress>,
359 from: NaiveDate,
360 to: NaiveDate,
361 ) -> Result<Option<NaiveDate>> {
362 if let Some(p) = progress {
363 p.set_planning_step(PlanningStep::FindingFirstPerformance);
364 }
365
366 if from > to {
367 return Ok(None);
368 }
369
370 let mut low = from;
371 let mut high = to;
372 let mut found = None;
373
374 while low <= high {
375 let span = (high - low).num_days();
376 let mid = low + Duration::days(span / 2);
377
378 if self.has_performance_data(mid).await? {
379 found = Some(mid);
380 if mid == low {
381 break;
382 }
383 high = mid - Duration::days(1);
384 } else {
385 low = mid + Duration::days(1);
386 }
387 }
388
389 Ok(found)
390 }
391
392 pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
394 self.run_with_progress(&opts).await
395 }
396
397 async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
399 let progress = Arc::new(SyncProgress::new());
400
401 progress.set_storage_path(&self.storage.base_path().display().to_string());
403 progress.set_sync_mode(opts.mode);
404 println!("Using storage: {}", progress.get_storage_path());
405 println!("Planning sync...");
406
407 progress.set_planning_step(PlanningStep::FetchingProfile);
409 let display_name = self.get_display_name().await?;
410 progress.set_profile(&display_name);
411 self.queue.set_pipeline(pipeline_filter(opts.mode));
412
413 let _recovered = self.queue.recover_in_progress()?;
415
416 if self.queue.pending_count()? == 0 {
418 self.plan_sync_with_progress(opts, &progress).await?;
419 }
420
421 progress.finish_planning();
423
424 self.count_tasks_for_progress(&progress)?;
426
427 let today = Utc::now().date_naive();
429 let to_date = opts.to_date.unwrap_or(today);
430
431 match opts.mode {
432 progress::SyncMode::Latest => {
433 let from_date = opts.from_date.unwrap_or_else(|| today - Duration::days(7));
435 progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
436 progress.set_date_range(&from_date.to_string(), &to_date.to_string());
437 }
438 progress::SyncMode::Backfill => {
439 let oldest = progress
441 .get_oldest_activity_date()
442 .unwrap_or_else(|| (today - Duration::days(365)).to_string());
443 let from_date = opts
444 .from_date
445 .map(|d| d.to_string())
446 .unwrap_or(oldest.clone());
447 progress.set_backfill_range(&from_date, &oldest);
448 progress.set_date_range(&oldest, &from_date);
449 }
450 }
451
452 print_sync_overview(&progress);
453 println!("Planning complete.");
454
455 let stats_result = if opts.dry_run {
456 println!("Dry run mode - no changes will be made");
457 Ok(SyncStats::default())
458 } else {
459 let reporter_progress = progress.clone();
460 let reporter_handle = tokio::spawn(async move {
461 run_progress_reporter(reporter_progress).await;
462 });
463
464 let result = self
465 .run_with_progress_tracking(opts, progress.clone())
466 .await;
467 progress.request_shutdown();
468 let _ = reporter_handle.await;
469 result
470 };
471
472 let stats = stats_result?;
473 print_sync_errors(&progress);
474
475 if !opts.dry_run && stats.completed > 0 {
477 self.update_sync_state_after_completion(opts, today).await?;
478 }
479
480 if !opts.dry_run {
481 println!("\nSync complete: {}", stats);
482 }
483 Ok(stats)
484 }
485
486 async fn update_sync_state_after_completion(
488 &self,
489 opts: &SyncOptions,
490 today: NaiveDate,
491 ) -> Result<()> {
492 use crate::db::models::SyncState;
493
494 match opts.mode {
495 progress::SyncMode::Latest => {
496 if opts.sync_activities {
498 let state = SyncState {
499 profile_id: self.profile_id,
500 data_type: "activities".to_string(),
501 last_sync_date: Some(today),
502 last_activity_id: None,
503 };
504 self.storage.sync_db.update_sync_state(&state)?;
505 }
506 if opts.sync_health {
507 let state = SyncState {
508 profile_id: self.profile_id,
509 data_type: "health".to_string(),
510 last_sync_date: Some(today),
511 last_activity_id: None,
512 };
513 self.storage.sync_db.update_sync_state(&state)?;
514 }
515 }
516 progress::SyncMode::Backfill => {
517 let pending = self.queue.pending_count()?;
519 if pending == 0 {
520 self.storage
522 .sync_db
523 .mark_backfill_complete(self.profile_id, "activities")?;
524 self.storage
525 .sync_db
526 .mark_backfill_complete(self.profile_id, "health")?;
527 self.storage
528 .sync_db
529 .mark_backfill_complete(self.profile_id, "performance")?;
530 }
531 }
532 }
533
534 Ok(())
535 }
536
537 async fn run_with_progress_tracking(
539 &mut self,
540 opts: &SyncOptions,
541 progress: SharedProgress,
542 ) -> Result<SyncStats> {
543 self.run_parallel(opts, progress).await
545 }
546
547 async fn run_parallel(
552 &mut self,
553 opts: &SyncOptions,
554 progress: SharedProgress,
555 ) -> Result<SyncStats> {
556 let rate_limiter = SharedRateLimiter::new(opts.concurrency);
557 let queue = SharedTaskQueue::new(TaskQueue::new(
558 SyncDb::open(self.storage.base_path().join("sync.db"))?,
559 self.profile_id,
560 pipeline_filter(opts.mode),
561 ));
562
563 self.run_parallel_with_resources(
564 opts,
565 progress,
566 queue,
567 rate_limiter,
568 pipeline_filter(opts.mode),
569 )
570 .await
571 }
572
573 pub(crate) async fn run_parallel_with_resources(
574 &mut self,
575 opts: &SyncOptions,
576 progress: SharedProgress,
577 queue: SharedTaskQueue,
578 rate_limiter: SharedRateLimiter,
579 pipeline_filter: Option<SyncPipeline>,
580 ) -> Result<SyncStats> {
581 let (tx, rx) = mpsc::channel::<SyncData>(100);
583
584 let parquet = Arc::new(self.storage.parquet.clone());
586 let client = self.client.clone();
587 let token = self.token.clone();
588 let stats = Arc::new(TokioMutex::new(SyncStats::default()));
589 let in_flight = Arc::new(AtomicUsize::new(0));
590 let display_name = Arc::new(self.get_display_name().await?);
591 let profile_id = self.profile_id;
592
593 let mut producer_handles = Vec::new();
595 for id in 0..opts.concurrency {
596 let tx = tx.clone();
597 let queue = queue.clone();
598 let context = ProducerContext {
599 rate_limiter: rate_limiter.clone(),
600 client: client.clone(),
601 token: token.clone(),
602 progress: progress.clone(),
603 display_name: Arc::clone(&display_name),
604 profile_id,
605 stats: Arc::clone(&stats),
606 in_flight: Arc::clone(&in_flight),
607 parquet: Arc::clone(&parquet),
608 force: opts.force,
609 pipeline_filter,
610 };
611
612 producer_handles.push(tokio::spawn(async move {
613 producer_loop(id, queue, tx, context).await
614 }));
615 }
616 drop(tx); let rx = Arc::new(TokioMutex::new(rx));
620 let mut consumer_handles = Vec::new();
621 for id in 0..opts.concurrency {
622 let rx = Arc::clone(&rx);
623 let parquet = Arc::clone(&parquet);
624 let queue = queue.clone();
625 let stats = Arc::clone(&stats);
626 let progress = progress.clone();
627 let in_flight = Arc::clone(&in_flight);
628
629 consumer_handles.push(tokio::spawn(async move {
630 consumer_loop(id, rx, parquet, queue, stats, progress, in_flight).await
631 }));
632 }
633
634 for h in producer_handles {
636 if let Err(e) = h.await {
637 eprintln!("Producer error: {}", e);
638 }
639 }
640
641 for h in consumer_handles {
643 if let Err(e) = h.await {
644 eprintln!("Consumer error: {}", e);
645 }
646 }
647
648 queue.cleanup(7).await?;
650
651 self.storage
653 .sync_db
654 .update_profile_sync_time(self.profile_id)?;
655
656 let final_stats = stats.lock().await;
658 Ok(SyncStats {
659 recovered: final_stats.recovered,
660 completed: final_stats.completed,
661 rate_limited: final_stats.rate_limited,
662 failed: final_stats.failed,
663 })
664 }
665
666 fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
671 let (_activities, _gpx, health, performance) = self.queue.count_by_type()?;
673
674 if progress.activities.get_total() == 0 {
677 progress.activities.set_total(1); progress.activities.set_dynamic(true);
679 }
680 if progress.gpx.get_total() == 0 {
681 progress.gpx.set_dynamic(true); }
683
684 progress.health.set_total(health);
686 progress.performance.set_total(performance);
687
688 Ok(())
689 }
690
691 async fn plan_sync_with_progress(
693 &mut self,
694 opts: &SyncOptions,
695 progress: &SyncProgress,
696 ) -> Result<()> {
697 let today = Utc::now().date_naive();
698
699 match opts.mode {
700 progress::SyncMode::Latest => self.plan_latest_sync(opts, progress, today).await,
701 progress::SyncMode::Backfill => self.plan_backfill_sync(opts, progress, today).await,
702 }
703 }
704
705 pub(crate) async fn plan_latest_sync(
707 &mut self,
708 opts: &SyncOptions,
709 progress: &SyncProgress,
710 today: NaiveDate,
711 ) -> Result<()> {
712 let last_sync = self
714 .storage
715 .sync_db
716 .get_sync_state(self.profile_id, "activities")?;
717 let from_date = opts.from_date.unwrap_or_else(|| {
718 last_sync
719 .and_then(|s| s.last_sync_date)
720 .unwrap_or_else(|| today - Duration::days(7))
721 });
722 let to_date = opts.to_date.unwrap_or(today);
723
724 progress.set_latest_range(&from_date.to_string(), &to_date.to_string());
725 progress.set_oldest_activity_date(&from_date.to_string());
726
727 let total_days = (to_date - from_date).num_days().max(0) as u32 + 1;
728
729 if opts.sync_activities {
731 progress.set_planning_step(PlanningStep::PlanningActivities);
732 self.plan_activities_sync(SyncPipeline::Frontier, Some(from_date), Some(to_date))?;
733 }
734
735 if opts.sync_health {
737 progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
738 self.plan_health_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
739 }
740
741 if opts.sync_performance {
743 let total_weeks = (total_days / 7).max(1);
744 progress.set_planning_step(PlanningStep::PlanningPerformance { weeks: total_weeks });
745 self.plan_performance_sync(from_date, to_date, opts.force, SyncPipeline::Frontier)?;
746 }
747
748 Ok(())
749 }
750
751 pub(crate) async fn plan_backfill_sync(
753 &mut self,
754 opts: &SyncOptions,
755 progress: &SyncProgress,
756 today: NaiveDate,
757 ) -> Result<()> {
758 let (oldest_date, total_activities, estimated_gps) =
760 self.find_oldest_activity_date(Some(progress)).await?;
761
762 let backfill_state = self
764 .storage
765 .sync_db
766 .get_backfill_state(self.profile_id, "activities")?;
767
768 let (frontier_date, activities_complete) = match backfill_state {
769 Some((frontier, _target, complete)) => (frontier, complete),
770 None => {
771 let last_sync = self
773 .storage
774 .sync_db
775 .get_sync_state(self.profile_id, "activities")?;
776 let frontier = last_sync.and_then(|s| s.last_sync_date).unwrap_or(today);
777
778 self.storage.sync_db.set_backfill_state(
780 self.profile_id,
781 "activities",
782 frontier,
783 oldest_date,
784 false,
785 )?;
786 (frontier, false)
787 }
788 };
789
790 let activity_from = opts.from_date.unwrap_or(oldest_date);
792 let activity_to = opts.to_date.unwrap_or(frontier_date);
793
794 progress.set_backfill_range(&frontier_date.to_string(), &oldest_date.to_string());
795 progress.set_oldest_activity_date(&oldest_date.to_string());
796
797 if opts.sync_activities && !activities_complete {
799 progress.set_planning_step(PlanningStep::PlanningActivities);
800 self.plan_activities_sync(
801 SyncPipeline::Backfill,
802 Some(activity_from),
803 Some(activity_to),
804 )?;
805 if total_activities > 0 {
806 progress.activities.set_total(total_activities);
807 progress.gpx.set_total(estimated_gps);
808 }
809 }
810
811 if opts.sync_health {
812 let health_state = self
813 .storage
814 .sync_db
815 .get_backfill_state(self.profile_id, "health")?;
816
817 let health_target = match health_state {
818 Some((_frontier, target, complete)) => (!complete).then_some(target),
819 None => {
820 let search_from = activity_from;
821 let search_to = activity_to;
822 let first_health = self
823 .find_first_health_date(Some(progress), search_from, search_to)
824 .await?;
825 match first_health {
826 Some(first) => {
827 self.storage.sync_db.set_backfill_state(
828 self.profile_id,
829 "health",
830 frontier_date,
831 first,
832 false,
833 )?;
834 Some(first)
835 }
836 None => {
837 self.storage.sync_db.set_backfill_state(
838 self.profile_id,
839 "health",
840 frontier_date,
841 frontier_date,
842 true,
843 )?;
844 None
845 }
846 }
847 }
848 };
849
850 if let Some(target) = health_target {
851 let health_from = std::cmp::max(activity_from, target);
852 let health_to = activity_to;
853 if health_from <= health_to {
854 let total_days = (health_to - health_from).num_days().max(0) as u32 + 1;
855 progress.set_planning_step(PlanningStep::PlanningHealth { days: total_days });
856 self.plan_health_sync(
857 health_from,
858 health_to,
859 opts.force,
860 SyncPipeline::Backfill,
861 )?;
862 }
863 }
864 }
865
866 if opts.sync_performance {
867 let perf_state = self
868 .storage
869 .sync_db
870 .get_backfill_state(self.profile_id, "performance")?;
871
872 let perf_target = match perf_state {
873 Some((_frontier, target, complete)) => (!complete).then_some(target),
874 None => {
875 let search_from = activity_from;
876 let search_to = activity_to;
877 let first_perf = self
878 .find_first_performance_date(Some(progress), search_from, search_to)
879 .await?;
880 match first_perf {
881 Some(first) => {
882 self.storage.sync_db.set_backfill_state(
883 self.profile_id,
884 "performance",
885 frontier_date,
886 first,
887 false,
888 )?;
889 Some(first)
890 }
891 None => {
892 self.storage.sync_db.set_backfill_state(
893 self.profile_id,
894 "performance",
895 frontier_date,
896 frontier_date,
897 true,
898 )?;
899 None
900 }
901 }
902 }
903 };
904
905 if let Some(target) = perf_target {
906 let perf_from = std::cmp::max(activity_from, target);
907 let perf_to = activity_to;
908 if perf_from <= perf_to {
909 let total_weeks = ((perf_to - perf_from).num_days().max(0) as u32 / 7).max(1);
910 progress.set_planning_step(PlanningStep::PlanningPerformance {
911 weeks: total_weeks,
912 });
913 self.plan_performance_sync(
914 perf_from,
915 perf_to,
916 opts.force,
917 SyncPipeline::Backfill,
918 )?;
919 }
920 }
921 }
922
923 Ok(())
924 }
925
926 fn plan_activities_sync(
928 &self,
929 pipeline: SyncPipeline,
930 min_date: Option<NaiveDate>,
931 max_date: Option<NaiveDate>,
932 ) -> Result<()> {
933 let task = SyncTask::new(
935 self.profile_id,
936 pipeline,
937 SyncTaskType::Activities {
938 start: 0,
939 limit: 50,
940 min_date,
941 max_date,
942 },
943 );
944 self.queue.push(task)?;
945 Ok(())
946 }
947
948 fn plan_health_sync(
950 &self,
951 from: NaiveDate,
952 to: NaiveDate,
953 force: bool,
954 pipeline: SyncPipeline,
955 ) -> Result<u32> {
956 let mut count = 0;
957 let mut date = from;
958 while date <= to {
959 if force
960 || !self
961 .storage
962 .parquet
963 .has_daily_health(self.profile_id, date)?
964 {
965 let task = SyncTask::new(
966 self.profile_id,
967 pipeline,
968 SyncTaskType::DailyHealth { date },
969 );
970 self.queue.push(task)?;
971 count += 1;
972 }
973 date += Duration::days(1);
974 }
975 Ok(count)
976 }
977
978 fn plan_performance_sync(
980 &self,
981 from: NaiveDate,
982 to: NaiveDate,
983 force: bool,
984 pipeline: SyncPipeline,
985 ) -> Result<u32> {
986 let mut count = 0;
988 let mut date = from;
989 while date <= to {
990 if force
991 || !self
992 .storage
993 .parquet
994 .has_performance_metrics(self.profile_id, date)?
995 {
996 let task = SyncTask::new(
997 self.profile_id,
998 pipeline,
999 SyncTaskType::Performance { date },
1000 );
1001 self.queue.push(task)?;
1002 count += 1;
1003 }
1004 date += Duration::days(7);
1005 }
1006 Ok(count)
1007 }
1008}
1009
1010fn print_sync_overview(progress: &SyncProgress) {
1011 println!("Profile: {}", progress.get_profile());
1012 println!("Mode: {}", progress.get_sync_mode());
1013 println!("Range: {}", progress.get_date_range());
1014 println!(
1015 "Queued tasks: activities {}, gpx {}, health {}, performance {}",
1016 progress.activities.get_total(),
1017 progress.gpx.get_total(),
1018 progress.health.get_total(),
1019 progress.performance.get_total()
1020 );
1021}
1022
1023fn print_sync_errors(progress: &SyncProgress) {
1024 let errors = progress.get_errors();
1025 if errors.is_empty() {
1026 return;
1027 }
1028
1029 println!("\nRecent errors:");
1030 for error in errors.iter().take(5) {
1031 println!(" [{}] {}: {}", error.stream, error.item, error.error);
1032 }
1033
1034 if errors.len() > 5 {
1035 println!(" ... and {} more", errors.len() - 5);
1036 }
1037}
1038
1039async fn run_progress_reporter(progress: SharedProgress) {
1040 loop {
1041 progress.print_simple_status();
1042
1043 if progress.should_shutdown() || progress.is_complete() {
1044 println!();
1045 break;
1046 }
1047
1048 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
1049 }
1050}
1051
1052#[derive(Clone)]
1054pub struct SyncOptions {
1055 pub sync_activities: bool,
1057 pub sync_health: bool,
1059 pub sync_performance: bool,
1061 pub from_date: Option<NaiveDate>,
1063 pub to_date: Option<NaiveDate>,
1065 pub dry_run: bool,
1067 pub force: bool,
1069 pub concurrency: usize,
1071 pub mode: progress::SyncMode,
1073}
1074
1075impl Default for SyncOptions {
1076 fn default() -> Self {
1077 Self {
1078 sync_activities: false,
1079 sync_health: false,
1080 sync_performance: false,
1081 from_date: None,
1082 to_date: None,
1083 dry_run: false,
1084 force: false,
1085 concurrency: 4,
1086 mode: progress::SyncMode::Latest,
1087 }
1088 }
1089}
1090
1091#[derive(Default)]
1093pub struct SyncStats {
1094 pub recovered: u32,
1096 pub completed: u32,
1098 pub rate_limited: u32,
1100 pub failed: u32,
1102}
1103
1104impl std::fmt::Display for SyncStats {
1105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1106 write!(
1107 f,
1108 "Completed: {}, Failed: {}, Rate limited: {}",
1109 self.completed, self.failed, self.rate_limited
1110 )?;
1111 if self.recovered > 0 {
1112 write!(f, ", Recovered: {}", self.recovered)?;
1113 }
1114 Ok(())
1115 }
1116}
1117
1118fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1120 let desc = match &task.task_type {
1121 SyncTaskType::Activities { start, limit, .. } => {
1122 format!("Activities {}-{}", start, start + limit)
1123 }
1124 SyncTaskType::DownloadGpx {
1125 activity_name,
1126 activity_date,
1127 ..
1128 } => {
1129 let name = activity_name.as_deref().unwrap_or("Unknown");
1130 let date = activity_date.as_deref().unwrap_or("");
1131 if date.is_empty() {
1132 name.to_string()
1133 } else {
1134 format!("{} {}", date, name)
1135 }
1136 }
1137 SyncTaskType::DailyHealth { date } => date.to_string(),
1138 SyncTaskType::Performance { date } => date.to_string(),
1139 _ => String::new(),
1140 };
1141
1142 match &task.task_type {
1143 SyncTaskType::Activities { .. } => {
1144 progress.activities.set_current_item(desc.clone());
1145 progress.activities.set_last_item(desc);
1146 }
1147 SyncTaskType::DownloadGpx { .. } => {
1148 progress.gpx.set_current_item(desc.clone());
1149 progress.gpx.set_last_item(desc);
1150 }
1151 SyncTaskType::DailyHealth { .. } => {
1152 progress.health.set_current_item(desc.clone());
1153 progress.health.set_last_item(desc);
1154 }
1155 SyncTaskType::Performance { .. } => {
1156 progress.performance.set_current_item(desc.clone());
1157 progress.performance.set_last_item(desc);
1158 }
1159 _ => {}
1160 }
1161}
1162
1163#[allow(dead_code)]
1165fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1166 match &task.task_type {
1167 SyncTaskType::Activities { .. } => {
1168 progress.activities.complete_one();
1169 progress.activities.clear_current_item();
1170 }
1171 SyncTaskType::DownloadGpx { .. } => {
1172 progress.gpx.complete_one();
1173 progress.gpx.clear_current_item();
1174 }
1175 SyncTaskType::DailyHealth { .. } => {
1176 progress.health.complete_one();
1177 progress.health.clear_current_item();
1178 }
1179 SyncTaskType::Performance { .. } => {
1180 progress.performance.complete_one();
1181 progress.performance.clear_current_item();
1182 }
1183 _ => {}
1184 }
1185}
1186
1187fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress, error: &str) {
1189 let (stream_name, item_desc) = match &task.task_type {
1190 SyncTaskType::Activities { start, limit, .. } => {
1191 progress.activities.fail_one();
1192 progress.activities.clear_current_item();
1193 ("Activities", format!("{}-{}", start, start + limit))
1194 }
1195 SyncTaskType::DownloadGpx {
1196 activity_id,
1197 activity_name,
1198 ..
1199 } => {
1200 progress.gpx.fail_one();
1201 progress.gpx.clear_current_item();
1202 (
1203 "GPX",
1204 activity_name
1205 .clone()
1206 .unwrap_or_else(|| activity_id.to_string()),
1207 )
1208 }
1209 SyncTaskType::DailyHealth { date } => {
1210 progress.health.fail_one();
1211 progress.health.clear_current_item();
1212 ("Health", date.to_string())
1213 }
1214 SyncTaskType::Performance { date } => {
1215 progress.performance.fail_one();
1216 progress.performance.clear_current_item();
1217 ("Performance", date.to_string())
1218 }
1219 _ => return,
1220 };
1221
1222 progress.add_error(stream_name, item_desc, error.to_string());
1223}
1224
1225const MAX_IDLE_RETRIES: u32 = 10;
1226
1227fn should_exit_when_idle(idle_loops: u32, in_flight: usize) -> bool {
1228 idle_loops >= MAX_IDLE_RETRIES && in_flight == 0
1229}
1230
1231fn record_write_failure(data: &SyncData, progress: &SyncProgress, error: &str) {
1232 match data {
1233 SyncData::Activities { records, .. } => {
1234 progress.activities.fail_one();
1235 progress.activities.clear_current_item();
1236 let item = records
1237 .first()
1238 .map(|r| r.activity_id.to_string())
1239 .unwrap_or_else(|| "batch".to_string());
1240 progress.add_error("Activities", item, error.to_string());
1241 }
1242 SyncData::Health { record, .. } => {
1243 progress.health.fail_one();
1244 progress.health.clear_current_item();
1245 progress.add_error("Health", record.date.to_string(), error.to_string());
1246 }
1247 SyncData::Performance { record, .. } => {
1248 progress.performance.fail_one();
1249 progress.performance.clear_current_item();
1250 progress.add_error("Performance", record.date.to_string(), error.to_string());
1251 }
1252 SyncData::TrackPoints {
1253 activity_id, date, ..
1254 } => {
1255 progress.gpx.fail_one();
1256 progress.gpx.clear_current_item();
1257 progress.add_error(
1258 "GPX",
1259 format!("{} ({})", date, activity_id),
1260 error.to_string(),
1261 );
1262 }
1263 }
1264}
1265
1266async fn producer_loop(
1272 _id: usize,
1273 queue: SharedTaskQueue,
1274 tx: mpsc::Sender<SyncData>,
1275 context: ProducerContext,
1276) {
1277 let mut empty_count = 0;
1278
1279 loop {
1280 let next_task = match context.pipeline_filter {
1282 Some(pipeline) => queue.pop_round_robin_with_pipeline(Some(pipeline)).await,
1283 None => queue.pop_round_robin().await,
1284 };
1285
1286 let task = match next_task {
1287 Ok(Some(task)) => {
1288 empty_count = 0; task
1290 }
1291 Ok(None) => {
1292 empty_count += 1;
1295 if should_exit_when_idle(empty_count, context.in_flight.load(Ordering::Relaxed)) {
1296 break; }
1298 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
1299 continue;
1300 }
1301 Err(e) => {
1302 eprintln!("Queue error: {}", e);
1303 break;
1304 }
1305 };
1306
1307 let task_id = task.id.unwrap();
1308 context.in_flight.fetch_add(1, Ordering::Relaxed);
1309
1310 if let Err(e) = queue.mark_in_progress(task_id).await {
1312 eprintln!("Failed to mark task in progress: {}", e);
1313 continue;
1314 }
1315
1316 update_progress_for_task(&task, &context.progress);
1318
1319 if !context.force {
1321 let should_skip = match &task.task_type {
1322 SyncTaskType::DailyHealth { date } => context
1323 .parquet
1324 .has_daily_health(context.profile_id, *date)
1325 .unwrap_or(false),
1326 SyncTaskType::Performance { date } => context
1327 .parquet
1328 .has_performance_metrics(context.profile_id, *date)
1329 .unwrap_or(false),
1330 SyncTaskType::DownloadGpx {
1331 activity_id,
1332 activity_date,
1333 ..
1334 } => activity_date
1335 .as_ref()
1336 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1337 .and_then(|date| context.parquet.has_track_points(*activity_id, date).ok())
1338 .unwrap_or(false),
1339 _ => false,
1340 };
1341
1342 if should_skip {
1343 if let Err(e) = queue.mark_completed(task_id).await {
1344 eprintln!("Failed to mark task completed: {}", e);
1345 }
1346 complete_progress_for_task(&task, &context.progress);
1347 {
1348 let mut s = context.stats.lock().await;
1349 s.completed += 1;
1350 }
1351 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1352 continue;
1353 }
1354 }
1355
1356 let _permit = context.rate_limiter.acquire().await;
1358 context.progress.record_request();
1359
1360 let result = fetch_task_data(
1362 &task,
1363 &context.client,
1364 &context.token,
1365 &context.display_name,
1366 context.profile_id,
1367 )
1368 .await;
1369
1370 match result {
1371 Ok(data) => {
1372 context.rate_limiter.on_success();
1373 if tx.send(data).await.is_err() {
1375 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1377 let backoff = Duration::seconds(60);
1378 let _ = queue
1379 .mark_failed(task_id, "Consumer channel closed", backoff)
1380 .await;
1381 break;
1382 }
1383 }
1384 Err(GarminError::RateLimited) => {
1385 context.rate_limiter.on_rate_limit();
1386 let backoff = Duration::seconds(60);
1387 if let Err(e) = queue.mark_failed(task_id, "Rate limited", backoff).await {
1388 eprintln!("Failed to mark task as rate limited: {}", e);
1389 }
1390 fail_progress_for_task(&task, &context.progress, "Rate limited");
1391 {
1392 let mut s = context.stats.lock().await;
1393 s.rate_limited += 1;
1394 }
1395 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1396 }
1397 Err(e) => {
1398 let backoff = Duration::seconds(60);
1399 let error_msg = e.to_string();
1400 if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
1401 eprintln!("Failed to mark task as failed: {}", e);
1402 }
1403 fail_progress_for_task(&task, &context.progress, &error_msg);
1404 {
1405 let mut s = context.stats.lock().await;
1406 s.failed += 1;
1407 }
1408 context.in_flight.fetch_sub(1, Ordering::Relaxed);
1409 }
1410 }
1411 }
1412}
1413
1414fn value_to_i32(value: &serde_json::Value) -> Option<i32> {
1415 if let Some(int) = value.as_i64() {
1416 return Some(int as i32);
1417 }
1418 value.as_f64().map(|float| float.round() as i32)
1419}
1420
1421fn first_entry(value: &serde_json::Value) -> Option<&serde_json::Value> {
1422 if let Some(array) = value.as_array() {
1423 array.first()
1424 } else {
1425 Some(value)
1426 }
1427}
1428
1429fn parse_sleep_metrics(value: Option<&serde_json::Value>) -> SleepMetrics {
1430 let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value);
1431
1432 let dto = match dto {
1433 Some(dto) => dto,
1434 None => return (None, None, None, None, None),
1435 };
1436
1437 let deep = dto.get("deepSleepSeconds").and_then(value_to_i32);
1438 let light = dto.get("lightSleepSeconds").and_then(value_to_i32);
1439 let rem = dto.get("remSleepSeconds").and_then(value_to_i32);
1440
1441 let total = dto
1442 .get("sleepTimeSeconds")
1443 .and_then(value_to_i32)
1444 .or_else(|| match (deep, light, rem) {
1445 (Some(d), Some(l), Some(r)) => Some(d + l + r),
1446 _ => None,
1447 });
1448
1449 let score = dto
1450 .get("sleepScores")
1451 .and_then(|v| v.get("overall"))
1452 .and_then(|v| v.get("value"))
1453 .and_then(value_to_i32);
1454
1455 (total, deep, light, rem, score)
1456}
1457
1458fn parse_sleep_note(value: Option<&serde_json::Value>) -> Option<String> {
1461 let dto = value.and_then(|v| v.get("dailySleepDTO")).or(value)?;
1462 dto.get("userNote")
1463 .and_then(|v| v.as_str())
1464 .map(|s| s.trim())
1465 .filter(|s| !s.is_empty())
1466 .map(|s| s.to_string())
1467}
1468
1469fn parse_hrv_metrics(
1470 value: Option<&serde_json::Value>,
1471) -> (Option<i32>, Option<i32>, Option<String>) {
1472 let summary = value.and_then(|v| v.get("hrvSummary")).or(value);
1473
1474 let summary = match summary {
1475 Some(summary) => summary,
1476 None => return (None, None, None),
1477 };
1478
1479 let weekly_avg = summary.get("weeklyAvg").and_then(value_to_i32);
1480 let last_night = summary
1481 .get("lastNight")
1482 .or_else(|| summary.get("lastNightAvg"))
1483 .and_then(value_to_i32);
1484 let status = summary
1485 .get("status")
1486 .and_then(|v| v.as_str())
1487 .map(|s| s.to_string());
1488
1489 (weekly_avg, last_night, status)
1490}
1491
1492fn parse_vo2max_value(value: Option<&serde_json::Value>) -> Option<f64> {
1493 let entry = value.and_then(first_entry)?;
1494 entry
1495 .get("generic")
1496 .and_then(|v| v.get("vo2MaxValue"))
1497 .and_then(|v| v.as_f64())
1498}
1499
1500fn parse_vo2max_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1501 let entry = value.and_then(first_entry)?;
1502 entry
1503 .get("generic")
1504 .and_then(|v| v.get("fitnessAge"))
1505 .and_then(value_to_i32)
1506}
1507
1508fn parse_fitness_age(value: Option<&serde_json::Value>) -> Option<i32> {
1509 value
1510 .and_then(|v| v.get("fitnessAge"))
1511 .and_then(value_to_i32)
1512}
1513
1514fn parse_race_predictions(
1515 value: Option<&serde_json::Value>,
1516) -> (Option<i32>, Option<i32>, Option<i32>, Option<i32>) {
1517 let entry = match value.and_then(first_entry) {
1518 Some(entry) => entry,
1519 None => return (None, None, None, None),
1520 };
1521
1522 let race_5k = entry.get("time5K").and_then(value_to_i32);
1523 let race_10k = entry.get("time10K").and_then(value_to_i32);
1524 let race_half = entry.get("timeHalfMarathon").and_then(value_to_i32);
1525 let race_marathon = entry.get("timeMarathon").and_then(value_to_i32);
1526
1527 (race_5k, race_10k, race_half, race_marathon)
1528}
1529
1530fn parse_overall_score(value: Option<&serde_json::Value>) -> Option<i32> {
1531 let entry = value.and_then(first_entry)?;
1532 entry.get("overallScore").and_then(value_to_i32)
1533}
1534
1535fn parse_training_status(value: Option<&serde_json::Value>, date: NaiveDate) -> Option<String> {
1536 let root = value?;
1537 let date_str = date.to_string();
1538
1539 if let Some(latest) = root
1540 .get("mostRecentTrainingStatus")
1541 .and_then(|v| v.get("latestTrainingStatusData"))
1542 .and_then(|v| v.as_object())
1543 {
1544 for entry in latest.values() {
1545 if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1546 return entry
1547 .get("trainingStatusFeedbackPhrase")
1548 .and_then(|v| v.as_str())
1549 .map(|s| s.to_string());
1550 }
1551 }
1552 }
1553
1554 if let Some(history) = root.get("trainingStatusHistory").and_then(|v| v.as_array()) {
1555 for entry in history {
1556 if entry.get("calendarDate").and_then(|v| v.as_str()) == Some(date_str.as_str()) {
1557 return entry
1558 .get("trainingStatusFeedbackPhrase")
1559 .and_then(|v| v.as_str())
1560 .map(|s| s.to_string());
1561 }
1562 }
1563 }
1564
1565 None
1566}
1567
1568async fn fetch_task_data(
1570 task: &SyncTask,
1571 client: &GarminClient,
1572 token: &OAuth2Token,
1573 display_name: &str,
1574 profile_id: i32,
1575) -> Result<SyncData> {
1576 let task_id = task.id.unwrap();
1577
1578 match &task.task_type {
1579 SyncTaskType::Activities {
1580 start,
1581 limit,
1582 min_date,
1583 max_date,
1584 } => {
1585 let path = format!(
1586 "/activitylist-service/activities/search/activities?limit={}&start={}",
1587 limit, start
1588 );
1589 let activities: Vec<serde_json::Value> = client.get_json(token, &path).await?;
1590
1591 let mut records = Vec::new();
1592 let mut gpx_tasks = Vec::new();
1593 let mut reached_min = false;
1594
1595 for activity in &activities {
1596 let activity_date = activity
1597 .get("startTimeLocal")
1598 .and_then(|v| v.as_str())
1599 .and_then(|s| s.split(' ').next())
1600 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
1601
1602 if let Some(date) = activity_date {
1603 if let Some(min) = *min_date {
1604 if date < min {
1605 reached_min = true;
1606 break;
1607 }
1608 }
1609
1610 if let Some(max) = *max_date {
1611 if date > max {
1612 continue;
1613 }
1614 }
1615 }
1616
1617 let parsed = parse_activity(activity, profile_id)?;
1619 records.push(parsed);
1620
1621 if activity
1623 .get("hasPolyline")
1624 .and_then(|v| v.as_bool())
1625 .unwrap_or(false)
1626 {
1627 if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
1628 let activity_name = activity
1629 .get("activityName")
1630 .and_then(|v| v.as_str())
1631 .map(|s| s.to_string());
1632 let activity_date = activity
1633 .get("startTimeLocal")
1634 .and_then(|v| v.as_str())
1635 .and_then(|s| s.split(' ').next())
1636 .map(|s| s.to_string());
1637
1638 gpx_tasks.push(SyncTask::new(
1639 profile_id,
1640 task.pipeline,
1641 SyncTaskType::DownloadGpx {
1642 activity_id: id,
1643 activity_name,
1644 activity_date,
1645 },
1646 ));
1647 }
1648 }
1649 }
1650
1651 let next_page = if activities.len() == *limit as usize && !reached_min {
1653 Some(SyncTask::new(
1654 profile_id,
1655 task.pipeline,
1656 SyncTaskType::Activities {
1657 start: start + limit,
1658 limit: *limit,
1659 min_date: *min_date,
1660 max_date: *max_date,
1661 },
1662 ))
1663 } else {
1664 None
1665 };
1666
1667 Ok(SyncData::Activities {
1668 records,
1669 gpx_tasks,
1670 next_page,
1671 task_id,
1672 })
1673 }
1674
1675 SyncTaskType::DownloadGpx {
1676 activity_id,
1677 activity_date,
1678 ..
1679 } => {
1680 let path = format!("/download-service/export/gpx/activity/{}", activity_id);
1681 let gpx_bytes = client.download(token, &path).await?;
1682 let gpx_data = String::from_utf8_lossy(&gpx_bytes);
1683
1684 let date = activity_date
1686 .as_ref()
1687 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
1688 .unwrap_or_else(|| Utc::now().date_naive());
1689
1690 let points = parse_gpx(*activity_id, &gpx_data)?;
1691
1692 Ok(SyncData::TrackPoints {
1693 activity_id: *activity_id,
1694 date,
1695 points,
1696 task_id,
1697 })
1698 }
1699
1700 SyncTaskType::DailyHealth { date } => {
1701 let path = format!(
1702 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
1703 display_name, date
1704 );
1705
1706 let health_result: std::result::Result<serde_json::Value, _> =
1708 client.get_json(token, &path).await;
1709
1710 let health = match health_result {
1711 Ok(data) => data,
1712 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
1713 serde_json::json!({})
1715 }
1716 Err(e) => return Err(e),
1717 };
1718
1719 let sleep_path = format!(
1720 "/wellness-service/wellness/dailySleepData/{}?date={}",
1721 display_name, date
1722 );
1723 let sleep_data: Option<serde_json::Value> =
1724 match client.get_json(token, &sleep_path).await {
1725 Ok(data) => Some(data),
1726 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1727 Err(e) => return Err(e),
1728 };
1729
1730 let hrv_path = format!("/hrv-service/hrv/{}", date);
1731 let hrv_data: Option<serde_json::Value> = match client.get_json(token, &hrv_path).await
1732 {
1733 Ok(data) => Some(data),
1734 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1735 Err(e) => return Err(e),
1736 };
1737
1738 let (sleep_total, deep_sleep, light_sleep, rem_sleep, sleep_score) =
1739 parse_sleep_metrics(sleep_data.as_ref());
1740 let sleep_note = parse_sleep_note(sleep_data.as_ref());
1741 let (hrv_weekly_avg, hrv_last_night, hrv_status) = parse_hrv_metrics(hrv_data.as_ref());
1742
1743 let record = DailyHealth {
1744 id: None,
1745 profile_id,
1746 date: *date,
1747 steps: health
1748 .get("totalSteps")
1749 .and_then(|v| v.as_i64())
1750 .map(|v| v as i32),
1751 step_goal: health
1752 .get("dailyStepGoal")
1753 .and_then(|v| v.as_i64())
1754 .map(|v| v as i32),
1755 total_calories: health
1756 .get("totalKilocalories")
1757 .and_then(|v| v.as_i64())
1758 .map(|v| v as i32),
1759 active_calories: health
1760 .get("activeKilocalories")
1761 .and_then(|v| v.as_i64())
1762 .map(|v| v as i32),
1763 bmr_calories: health
1764 .get("bmrKilocalories")
1765 .and_then(|v| v.as_i64())
1766 .map(|v| v as i32),
1767 resting_hr: health
1768 .get("restingHeartRate")
1769 .and_then(|v| v.as_i64())
1770 .map(|v| v as i32),
1771 sleep_seconds: sleep_total.or_else(|| {
1772 health
1773 .get("sleepingSeconds")
1774 .and_then(|v| v.as_i64())
1775 .map(|v| v as i32)
1776 }),
1777 deep_sleep_seconds: deep_sleep,
1778 light_sleep_seconds: light_sleep,
1779 rem_sleep_seconds: rem_sleep,
1780 sleep_score,
1781 sleep_note,
1782 avg_stress: health
1783 .get("averageStressLevel")
1784 .and_then(|v| v.as_i64())
1785 .map(|v| v as i32),
1786 max_stress: health
1787 .get("maxStressLevel")
1788 .and_then(|v| v.as_i64())
1789 .map(|v| v as i32),
1790 body_battery_start: health
1791 .get("bodyBatteryChargedValue")
1792 .and_then(|v| v.as_i64())
1793 .map(|v| v as i32),
1794 body_battery_end: health
1795 .get("bodyBatteryDrainedValue")
1796 .and_then(|v| v.as_i64())
1797 .map(|v| v as i32),
1798 hrv_weekly_avg,
1799 hrv_last_night,
1800 hrv_status,
1801 avg_respiration: health
1802 .get("averageRespirationValue")
1803 .and_then(|v| v.as_f64()),
1804 avg_spo2: health
1805 .get("averageSpo2Value")
1806 .and_then(|v| v.as_i64())
1807 .map(|v| v as i32),
1808 lowest_spo2: health
1809 .get("lowestSpo2Value")
1810 .and_then(|v| v.as_i64())
1811 .map(|v| v as i32),
1812 hydration_ml: health
1813 .get("hydrationIntakeGoal")
1814 .and_then(|v| v.as_i64())
1815 .map(|v| v as i32),
1816 moderate_intensity_min: health
1817 .get("moderateIntensityMinutes")
1818 .and_then(|v| v.as_i64())
1819 .map(|v| v as i32),
1820 vigorous_intensity_min: health
1821 .get("vigorousIntensityMinutes")
1822 .and_then(|v| v.as_i64())
1823 .map(|v| v as i32),
1824 raw_json: Some(health),
1825 };
1826
1827 Ok(SyncData::Health { record, task_id })
1828 }
1829
1830 SyncTaskType::Performance { date } => {
1831 let vo2_path = format!("/metrics-service/metrics/maxmet/daily/{}/{}", date, date);
1832 let vo2max: Option<serde_json::Value> = match client.get_json(token, &vo2_path).await {
1833 Ok(data) => Some(data),
1834 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1835 Err(e) => return Err(e),
1836 };
1837
1838 let race_path = format!(
1839 "/metrics-service/metrics/racepredictions/daily/{}?fromCalendarDate={}&toCalendarDate={}",
1840 display_name, date, date
1841 );
1842 let race_predictions: Option<serde_json::Value> =
1843 match client.get_json(token, &race_path).await {
1844 Ok(data) => Some(data),
1845 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1846 Err(e) => return Err(e),
1847 };
1848
1849 let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
1851 let training_readiness: Option<serde_json::Value> =
1852 client.get_json(token, &readiness_path).await.ok();
1853
1854 let status_path = format!(
1856 "/metrics-service/metrics/trainingstatus/aggregated/{}",
1857 date
1858 );
1859 let training_status: Option<serde_json::Value> =
1860 client.get_json(token, &status_path).await.ok();
1861
1862 let endurance_path = format!(
1863 "/metrics-service/metrics/endurancescore?calendarDate={}",
1864 date
1865 );
1866 let endurance_score_data: Option<serde_json::Value> =
1867 match client.get_json(token, &endurance_path).await {
1868 Ok(data) => Some(data),
1869 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1870 Err(e) => return Err(e),
1871 };
1872
1873 let hill_path = format!("/metrics-service/metrics/hillscore?calendarDate={}", date);
1874 let hill_score_data: Option<serde_json::Value> =
1875 match client.get_json(token, &hill_path).await {
1876 Ok(data) => Some(data),
1877 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1878 Err(e) => return Err(e),
1879 };
1880
1881 let fitness_age_path = format!("/fitnessage-service/fitnessage/{}", date);
1882 let fitness_age_data: Option<serde_json::Value> =
1883 match client.get_json(token, &fitness_age_path).await {
1884 Ok(data) => Some(data),
1885 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => None,
1886 Err(e) => return Err(e),
1887 };
1888
1889 let vo2max_value = parse_vo2max_value(vo2max.as_ref());
1890 let fitness_age = parse_fitness_age(fitness_age_data.as_ref())
1891 .or_else(|| parse_vo2max_fitness_age(vo2max.as_ref()));
1892
1893 let readiness_entry = training_readiness
1894 .as_ref()
1895 .and_then(|v| v.as_array())
1896 .and_then(|arr| arr.first());
1897
1898 let readiness_score = readiness_entry
1899 .and_then(|e| e.get("score"))
1900 .and_then(|v| v.as_i64())
1901 .map(|v| v as i32);
1902
1903 let training_status_str = parse_training_status(training_status.as_ref(), *date);
1904 let (race_5k, race_10k, race_half, race_marathon) =
1905 parse_race_predictions(race_predictions.as_ref());
1906 let endurance_score = parse_overall_score(endurance_score_data.as_ref());
1907 let hill_score = parse_overall_score(hill_score_data.as_ref());
1908
1909 let record = PerformanceMetrics {
1910 id: None,
1911 profile_id,
1912 date: *date,
1913 vo2max: vo2max_value,
1914 fitness_age,
1915 training_readiness: readiness_score,
1916 training_status: training_status_str,
1917 lactate_threshold_hr: None,
1918 lactate_threshold_pace: None,
1919 race_5k_sec: race_5k,
1920 race_10k_sec: race_10k,
1921 race_half_sec: race_half,
1922 race_marathon_sec: race_marathon,
1923 endurance_score,
1924 hill_score,
1925 raw_json: None,
1926 };
1927
1928 Ok(SyncData::Performance { record, task_id })
1929 }
1930
1931 _ => {
1932 Err(GarminError::invalid_response(
1934 "Unsupported task type for parallel sync",
1935 ))
1936 }
1937 }
1938}
1939
1940fn parse_activity(activity: &serde_json::Value, profile_id: i32) -> Result<Activity> {
1942 let activity_id = activity
1943 .get("activityId")
1944 .and_then(|v| v.as_i64())
1945 .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
1946
1947 let start_time_local = activity
1948 .get("startTimeLocal")
1949 .and_then(|v| v.as_str())
1950 .and_then(parse_garmin_datetime);
1951
1952 let start_time_gmt = activity
1953 .get("startTimeGMT")
1954 .and_then(|v| v.as_str())
1955 .and_then(parse_garmin_datetime);
1956
1957 Ok(Activity {
1958 activity_id,
1959 profile_id,
1960 activity_name: activity
1961 .get("activityName")
1962 .and_then(|v| v.as_str())
1963 .map(|s| s.to_string()),
1964 activity_type: activity
1965 .get("activityType")
1966 .and_then(|v| v.get("typeKey"))
1967 .and_then(|v| v.as_str())
1968 .map(|s| s.to_string()),
1969 start_time_local,
1970 start_time_gmt,
1971 duration_sec: activity.get("duration").and_then(|v| v.as_f64()),
1972 distance_m: activity.get("distance").and_then(|v| v.as_f64()),
1973 calories: activity
1974 .get("calories")
1975 .and_then(|v| v.as_f64())
1976 .map(|v| v as i32),
1977 avg_hr: activity
1978 .get("averageHR")
1979 .and_then(|v| v.as_f64())
1980 .map(|v| v as i32),
1981 max_hr: activity
1982 .get("maxHR")
1983 .and_then(|v| v.as_f64())
1984 .map(|v| v as i32),
1985 avg_speed: activity.get("averageSpeed").and_then(|v| v.as_f64()),
1986 max_speed: activity.get("maxSpeed").and_then(|v| v.as_f64()),
1987 elevation_gain: activity.get("elevationGain").and_then(|v| v.as_f64()),
1988 elevation_loss: activity.get("elevationLoss").and_then(|v| v.as_f64()),
1989 avg_cadence: activity
1990 .get("averageRunningCadenceInStepsPerMinute")
1991 .and_then(|v| v.as_f64()),
1992 avg_power: activity
1993 .get("avgPower")
1994 .and_then(|v| v.as_i64())
1995 .map(|v| v as i32),
1996 normalized_power: activity
1997 .get("normPower")
1998 .and_then(|v| v.as_i64())
1999 .map(|v| v as i32),
2000 training_effect: activity
2001 .get("aerobicTrainingEffect")
2002 .and_then(|v| v.as_f64()),
2003 training_load: activity
2004 .get("activityTrainingLoad")
2005 .and_then(|v| v.as_f64()),
2006 start_lat: activity.get("startLatitude").and_then(|v| v.as_f64()),
2007 start_lon: activity.get("startLongitude").and_then(|v| v.as_f64()),
2008 end_lat: activity.get("endLatitude").and_then(|v| v.as_f64()),
2009 end_lon: activity.get("endLongitude").and_then(|v| v.as_f64()),
2010 ground_contact_time: activity
2011 .get("avgGroundContactTime")
2012 .and_then(|v| v.as_f64()),
2013 vertical_oscillation: activity
2014 .get("avgVerticalOscillation")
2015 .and_then(|v| v.as_f64()),
2016 stride_length: activity.get("avgStrideLength").and_then(|v| v.as_f64()),
2017 location_name: activity
2018 .get("locationName")
2019 .and_then(|v| v.as_str())
2020 .map(|s| s.to_string()),
2021 raw_json: Some(activity.clone()),
2022 })
2023}
2024
2025fn parse_garmin_datetime(value: &str) -> Option<DateTime<Utc>> {
2026 if let Ok(dt) = DateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S") {
2027 return Some(dt.with_timezone(&Utc));
2028 }
2029
2030 let naive = NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S")
2031 .or_else(|_| NaiveDateTime::parse_from_str(value, "%Y-%m-%d %H:%M:%S%.f"))
2032 .ok()?;
2033
2034 Some(DateTime::<Utc>::from_naive_utc_and_offset(naive, Utc))
2035}
2036
2037fn parse_gpx(activity_id: i64, gpx_data: &str) -> Result<Vec<TrackPoint>> {
2039 use gpx::read;
2040 use std::io::BufReader;
2041
2042 let gpx_data = gpx_data.trim_start();
2043 let reader = BufReader::new(gpx_data.as_bytes());
2044 let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
2045 let mut streams = parse_gpx_track_point_streams(gpx_data)?.into_iter();
2046
2047 let mut points = Vec::new();
2048
2049 for track in gpx.tracks {
2050 for segment in track.segments {
2051 for point in segment.points {
2052 let stream = streams.next().unwrap_or_default();
2053 let timestamp = point
2054 .time
2055 .map(|t| {
2056 DateTime::parse_from_rfc3339(&t.format().unwrap_or_default())
2057 .map(|dt| dt.with_timezone(&Utc))
2058 .unwrap_or_default()
2059 })
2060 .unwrap_or_default();
2061
2062 points.push(TrackPoint {
2063 id: None,
2064 activity_id,
2065 timestamp,
2066 lat: Some(point.point().y()),
2067 lon: Some(point.point().x()),
2068 elevation: point.elevation,
2069 heart_rate: stream.heart_rate,
2070 cadence: stream.cadence,
2071 power: stream.power,
2072 speed: point.speed.or(stream.speed),
2073 });
2074 }
2075 }
2076 }
2077
2078 Ok(points)
2079}
2080
2081fn parse_gpx_track_point_streams(gpx_data: &str) -> Result<Vec<TrackPointStreams>> {
2082 use xml::reader::{EventReader, XmlEvent};
2083
2084 let parser = EventReader::from_str(gpx_data);
2085 let mut streams = Vec::new();
2086 let mut current_point: Option<TrackPointStreams> = None;
2087 let mut current_field: Option<TrackPointStreamField> = None;
2088 let mut current_text = String::new();
2089
2090 for event in parser {
2091 match event.map_err(|e| GarminError::invalid_response(e.to_string()))? {
2092 XmlEvent::StartElement { name, .. } => {
2093 if name.local_name == "trkpt" {
2094 current_point = Some(TrackPointStreams::default());
2095 current_field = None;
2096 current_text.clear();
2097 } else if current_point.is_some() {
2098 if let Some(field) = track_point_stream_field(&name.local_name) {
2099 current_field = Some(field);
2100 current_text.clear();
2101 }
2102 }
2103 }
2104 XmlEvent::Characters(text) | XmlEvent::CData(text) if current_field.is_some() => {
2105 current_text.push_str(&text);
2106 }
2107 XmlEvent::EndElement { name } => {
2108 if name.local_name == "trkpt" {
2109 streams.push(current_point.take().unwrap_or_default());
2110 current_field = None;
2111 current_text.clear();
2112 } else if let (Some(field), Some(point)) = (current_field, current_point.as_mut()) {
2113 if track_point_stream_field(&name.local_name) == Some(field) {
2114 apply_track_point_stream_field(point, field, ¤t_text);
2115 current_field = None;
2116 current_text.clear();
2117 }
2118 }
2119 }
2120 _ => {}
2121 }
2122 }
2123
2124 Ok(streams)
2125}
2126
2127fn track_point_stream_field(local_name: &str) -> Option<TrackPointStreamField> {
2128 match local_name.to_ascii_lowercase().as_str() {
2129 "hr" | "heartrate" | "heart_rate" | "heartratebpm" => {
2130 Some(TrackPointStreamField::HeartRate)
2131 }
2132 "cad" | "cadence" => Some(TrackPointStreamField::Cadence),
2133 "power" | "powerinwatts" | "watts" => Some(TrackPointStreamField::Power),
2134 "speed" => Some(TrackPointStreamField::Speed),
2135 _ => None,
2136 }
2137}
2138
2139fn apply_track_point_stream_field(
2140 point: &mut TrackPointStreams,
2141 field: TrackPointStreamField,
2142 value: &str,
2143) {
2144 match field {
2145 TrackPointStreamField::HeartRate => point.heart_rate = parse_i32_stream_value(value),
2146 TrackPointStreamField::Cadence => point.cadence = parse_i32_stream_value(value),
2147 TrackPointStreamField::Power => point.power = parse_i32_stream_value(value),
2148 TrackPointStreamField::Speed => point.speed = parse_f64_stream_value(value),
2149 }
2150}
2151
2152fn parse_i32_stream_value(value: &str) -> Option<i32> {
2153 let value = value.trim().parse::<f64>().ok()?;
2154 if value.is_finite() && value >= i32::MIN as f64 && value <= i32::MAX as f64 {
2155 Some(value.round() as i32)
2156 } else {
2157 None
2158 }
2159}
2160
2161fn parse_f64_stream_value(value: &str) -> Option<f64> {
2162 let value = value.trim().parse::<f64>().ok()?;
2163 value.is_finite().then_some(value)
2164}
2165
2166async fn consumer_loop(
2168 _id: usize,
2169 rx: Arc<TokioMutex<mpsc::Receiver<SyncData>>>,
2170 parquet: Arc<ParquetStore>,
2171 queue: SharedTaskQueue,
2172 stats: Arc<TokioMutex<SyncStats>>,
2173 progress: SharedProgress,
2174 in_flight: Arc<AtomicUsize>,
2175) {
2176 loop {
2177 let data = {
2179 let mut rx = rx.lock().await;
2180 rx.recv().await
2181 };
2182
2183 let data = match data {
2184 Some(d) => d,
2185 None => break, };
2187
2188 let result = match &data {
2190 SyncData::Activities {
2191 records,
2192 gpx_tasks,
2193 next_page,
2194 task_id,
2195 } => {
2196 let write_result = parquet.upsert_activities_async(records).await;
2198
2199 if write_result.is_ok() {
2200 let mut gpx_added = 0u32;
2202 for gpx_task in gpx_tasks {
2203 let should_skip = match &gpx_task.task_type {
2204 SyncTaskType::DownloadGpx {
2205 activity_id,
2206 activity_date,
2207 ..
2208 } => activity_date
2209 .as_ref()
2210 .and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok())
2211 .and_then(|date| parquet.has_track_points(*activity_id, date).ok())
2212 .unwrap_or(false),
2213 _ => false,
2214 };
2215
2216 if should_skip {
2217 continue;
2218 }
2219
2220 if let Err(e) = queue.push(gpx_task.clone()).await {
2221 eprintln!("Failed to queue GPX task: {}", e);
2222 } else {
2223 gpx_added += 1;
2224 }
2225 }
2226 if gpx_added > 0 {
2227 progress.gpx.add_total(gpx_added);
2228 }
2229
2230 if let Some(next) = next_page {
2232 if let Err(e) = queue.push(next.clone()).await {
2233 eprintln!("Failed to queue next page: {}", e);
2234 }
2235 progress.activities.add_total(1);
2236 }
2237 }
2238
2239 (write_result, *task_id, "Activities")
2240 }
2241
2242 SyncData::Health { record, task_id } => {
2243 let result = parquet
2244 .upsert_daily_health_async(std::slice::from_ref(record))
2245 .await;
2246 (result, *task_id, "Health")
2247 }
2248
2249 SyncData::Performance { record, task_id } => {
2250 let result = parquet
2251 .upsert_performance_metrics_async(std::slice::from_ref(record))
2252 .await;
2253 (result, *task_id, "Performance")
2254 }
2255
2256 SyncData::TrackPoints {
2257 date,
2258 points,
2259 task_id,
2260 ..
2261 } => {
2262 let result = parquet.write_track_points_async(*date, points).await;
2263 (result, *task_id, "GPX")
2264 }
2265 };
2266
2267 let (write_result, task_id, task_type) = result;
2268
2269 match write_result {
2270 Ok(()) => {
2271 if let Err(e) = queue.mark_completed(task_id).await {
2273 eprintln!("Failed to mark task completed: {}", e);
2274 }
2275
2276 {
2278 let mut s = stats.lock().await;
2279 s.completed += 1;
2280 }
2281
2282 match task_type {
2284 "Activities" => {
2285 progress.activities.complete_one();
2286 progress.activities.clear_current_item();
2287 }
2288 "Health" => {
2289 progress.health.complete_one();
2290 progress.health.clear_current_item();
2291 }
2292 "Performance" => {
2293 progress.performance.complete_one();
2294 progress.performance.clear_current_item();
2295 }
2296 "GPX" => {
2297 progress.gpx.complete_one();
2298 progress.gpx.clear_current_item();
2299 }
2300 _ => {}
2301 }
2302 in_flight.fetch_sub(1, Ordering::Relaxed);
2303 }
2304 Err(e) => {
2305 let backoff = Duration::seconds(60);
2307 let error_msg = e.to_string();
2308 if let Err(e) = queue.mark_failed(task_id, &error_msg, backoff).await {
2309 eprintln!("Failed to mark task as failed: {}", e);
2310 }
2311
2312 {
2314 let mut s = stats.lock().await;
2315 s.failed += 1;
2316 }
2317
2318 record_write_failure(&data, &progress, &error_msg);
2319 eprintln!("Write error for {}: {}", task_type, error_msg);
2320 in_flight.fetch_sub(1, Ordering::Relaxed);
2321 }
2322 }
2323 }
2324}
2325
2326#[cfg(test)]
2327mod tests {
2328 use super::*;
2329 use crate::db::models::TaskStatus;
2330 use chrono::NaiveDate;
2331 use serde_json::json;
2332 use wiremock::matchers::{method, path, query_param};
2333 use wiremock::{Mock, MockServer, ResponseTemplate};
2334
2335 #[test]
2336 fn test_should_exit_when_idle_requires_no_inflight() {
2337 assert!(!should_exit_when_idle(MAX_IDLE_RETRIES, 1));
2338 assert!(should_exit_when_idle(MAX_IDLE_RETRIES, 0));
2339 assert!(!should_exit_when_idle(MAX_IDLE_RETRIES - 1, 0));
2340 }
2341
2342 #[test]
2343 fn test_record_write_failure_updates_progress() {
2344 let progress = SyncProgress::new();
2345 let record = DailyHealth {
2346 id: None,
2347 profile_id: 1,
2348 date: NaiveDate::from_ymd_opt(2024, 12, 15).unwrap(),
2349 steps: None,
2350 step_goal: None,
2351 total_calories: None,
2352 active_calories: None,
2353 bmr_calories: None,
2354 resting_hr: None,
2355 sleep_seconds: None,
2356 deep_sleep_seconds: None,
2357 light_sleep_seconds: None,
2358 rem_sleep_seconds: None,
2359 sleep_score: None,
2360 sleep_note: None,
2361 avg_stress: None,
2362 max_stress: None,
2363 body_battery_start: None,
2364 body_battery_end: None,
2365 hrv_weekly_avg: None,
2366 hrv_last_night: None,
2367 hrv_status: None,
2368 avg_respiration: None,
2369 avg_spo2: None,
2370 lowest_spo2: None,
2371 hydration_ml: None,
2372 moderate_intensity_min: None,
2373 vigorous_intensity_min: None,
2374 raw_json: None,
2375 };
2376
2377 let data = SyncData::Health { record, task_id: 1 };
2378 record_write_failure(&data, &progress, "write failed");
2379
2380 assert_eq!(progress.health.get_failed(), 1);
2381 let errors = progress.get_errors();
2382 assert_eq!(errors.len(), 1);
2383 assert_eq!(errors[0].stream, "Health");
2384 }
2385
2386 fn test_token() -> OAuth2Token {
2387 OAuth2Token {
2388 scope: "test".to_string(),
2389 jti: "jti".to_string(),
2390 token_type: "Bearer".to_string(),
2391 access_token: "access".to_string(),
2392 refresh_token: "refresh".to_string(),
2393 expires_in: 3600,
2394 expires_at: Utc::now().timestamp() + 3600,
2395 refresh_token_expires_in: 86400,
2396 refresh_token_expires_at: Utc::now().timestamp() + 86400,
2397 }
2398 }
2399
2400 #[tokio::test]
2401 async fn test_activity_pagination_respects_date_bounds() {
2402 let server = MockServer::start().await;
2403
2404 let body = serde_json::json!([
2405 {
2406 "activityId": 1,
2407 "activityName": "Newest",
2408 "startTimeLocal": "2025-01-05 08:00:00",
2409 "startTimeGMT": "2025-01-05 07:00:00",
2410 "activityType": { "typeKey": "running" },
2411 "hasPolyline": false
2412 },
2413 {
2414 "activityId": 2,
2415 "activityName": "Mid",
2416 "startTimeLocal": "2025-01-04 08:00:00",
2417 "startTimeGMT": "2025-01-04 07:00:00",
2418 "activityType": { "typeKey": "running" },
2419 "hasPolyline": false
2420 },
2421 {
2422 "activityId": 3,
2423 "activityName": "Old",
2424 "startTimeLocal": "2025-01-03 08:00:00",
2425 "startTimeGMT": "2025-01-03 07:00:00",
2426 "activityType": { "typeKey": "running" },
2427 "hasPolyline": false
2428 }
2429 ]);
2430
2431 Mock::given(method("GET"))
2432 .and(path("/activitylist-service/activities/search/activities"))
2433 .and(query_param("limit", "50"))
2434 .and(query_param("start", "0"))
2435 .respond_with(ResponseTemplate::new(200).set_body_json(body))
2436 .mount(&server)
2437 .await;
2438
2439 let client = GarminClient::new_with_base_url(&server.uri());
2440 let task = SyncTask {
2441 id: Some(1),
2442 profile_id: 1,
2443 task_type: SyncTaskType::Activities {
2444 start: 0,
2445 limit: 50,
2446 min_date: Some(NaiveDate::from_ymd_opt(2025, 1, 4).unwrap()),
2447 max_date: Some(NaiveDate::from_ymd_opt(2025, 1, 5).unwrap()),
2448 },
2449 pipeline: SyncPipeline::Frontier,
2450 status: TaskStatus::Pending,
2451 attempts: 0,
2452 last_error: None,
2453 created_at: None,
2454 next_retry_at: None,
2455 completed_at: None,
2456 };
2457
2458 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2459 .await
2460 .unwrap();
2461
2462 match data {
2463 SyncData::Activities {
2464 records,
2465 gpx_tasks,
2466 next_page,
2467 ..
2468 } => {
2469 assert_eq!(records.len(), 2);
2470 assert!(gpx_tasks.is_empty());
2471 assert!(next_page.is_none());
2472 }
2473 _ => panic!("unexpected data type"),
2474 }
2475 }
2476
2477 #[tokio::test]
2478 async fn test_daily_health_includes_sleep_and_hrv() {
2479 let server = MockServer::start().await;
2480 let date = NaiveDate::from_ymd_opt(2025, 12, 4).unwrap();
2481
2482 let health_body = json!({
2483 "totalSteps": 1234,
2484 "sleepingSeconds": 1000,
2485 "averageStressLevel": 20
2486 });
2487
2488 Mock::given(method("GET"))
2489 .and(path("/usersummary-service/usersummary/daily/TestUser"))
2490 .and(query_param("calendarDate", "2025-12-04"))
2491 .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2492 .mount(&server)
2493 .await;
2494
2495 let sleep_fixture: serde_json::Value =
2496 serde_json::from_str(include_str!("../../tests/fixtures/sleep_2025-12-04.json"))
2497 .unwrap();
2498
2499 Mock::given(method("GET"))
2500 .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2501 .and(query_param("date", "2025-12-04"))
2502 .respond_with(ResponseTemplate::new(200).set_body_json(sleep_fixture))
2503 .mount(&server)
2504 .await;
2505
2506 let hrv_fixture: serde_json::Value =
2507 serde_json::from_str(include_str!("../../tests/fixtures/hrv.json")).unwrap();
2508
2509 Mock::given(method("GET"))
2510 .and(path("/hrv-service/hrv/2025-12-04"))
2511 .respond_with(ResponseTemplate::new(200).set_body_json(hrv_fixture))
2512 .mount(&server)
2513 .await;
2514
2515 let client = GarminClient::new_with_base_url(&server.uri());
2516 let mut task = SyncTask::new(
2517 1,
2518 SyncPipeline::Frontier,
2519 SyncTaskType::DailyHealth { date },
2520 );
2521 task.id = Some(1);
2522
2523 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2524 .await
2525 .unwrap();
2526
2527 match data {
2528 SyncData::Health { record, .. } => {
2529 assert_eq!(record.sleep_seconds, Some(31920));
2530 assert_eq!(record.deep_sleep_seconds, Some(8100));
2531 assert_eq!(record.light_sleep_seconds, Some(15300));
2532 assert_eq!(record.rem_sleep_seconds, Some(8520));
2533 assert_eq!(record.sleep_score, Some(88));
2534 assert_eq!(
2535 record.sleep_note.as_deref(),
2536 Some("Late caffeine, restless night.")
2537 );
2538 assert_eq!(record.hrv_weekly_avg, Some(65));
2539 assert_eq!(record.hrv_last_night, Some(68));
2540 assert_eq!(record.hrv_status.as_deref(), Some("BALANCED"));
2541 }
2542 _ => panic!("unexpected data type"),
2543 }
2544 }
2545
2546 #[tokio::test]
2547 async fn test_daily_health_handles_missing_sleep_and_hrv() {
2548 let server = MockServer::start().await;
2549 let date = NaiveDate::from_ymd_opt(2025, 12, 5).unwrap();
2550
2551 let health_body = json!({
2552 "totalSteps": 4321,
2553 "sleepingSeconds": 7200
2554 });
2555
2556 Mock::given(method("GET"))
2557 .and(path("/usersummary-service/usersummary/daily/TestUser"))
2558 .and(query_param("calendarDate", "2025-12-05"))
2559 .respond_with(ResponseTemplate::new(200).set_body_json(health_body))
2560 .mount(&server)
2561 .await;
2562
2563 Mock::given(method("GET"))
2564 .and(path("/wellness-service/wellness/dailySleepData/TestUser"))
2565 .and(query_param("date", "2025-12-05"))
2566 .respond_with(ResponseTemplate::new(404))
2567 .mount(&server)
2568 .await;
2569
2570 Mock::given(method("GET"))
2571 .and(path("/hrv-service/hrv/2025-12-05"))
2572 .respond_with(ResponseTemplate::new(404))
2573 .mount(&server)
2574 .await;
2575
2576 let client = GarminClient::new_with_base_url(&server.uri());
2577 let mut task = SyncTask::new(
2578 1,
2579 SyncPipeline::Frontier,
2580 SyncTaskType::DailyHealth { date },
2581 );
2582 task.id = Some(1);
2583
2584 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2585 .await
2586 .unwrap();
2587
2588 match data {
2589 SyncData::Health { record, .. } => {
2590 assert_eq!(record.sleep_seconds, Some(7200));
2591 assert_eq!(record.deep_sleep_seconds, None);
2592 assert_eq!(record.hrv_weekly_avg, None);
2593 assert_eq!(record.hrv_status, None);
2594 }
2595 _ => panic!("unexpected data type"),
2596 }
2597 }
2598
2599 #[test]
2600 fn test_parse_hrv_metrics_reads_last_night_avg() {
2601 let payload = json!({
2602 "hrvSummary": {
2603 "weeklyAvg": 60,
2604 "lastNightAvg": 58,
2605 "status": "BALANCED"
2606 }
2607 });
2608
2609 let (weekly_avg, last_night, status) = parse_hrv_metrics(Some(&payload));
2610 assert_eq!(weekly_avg, Some(60));
2611 assert_eq!(last_night, Some(58));
2612 assert_eq!(status.as_deref(), Some("BALANCED"));
2613 }
2614
2615 #[test]
2616 fn test_parse_gpx_reads_track_point_extension_streams() {
2617 let gpx_data = r#"
2618 <?xml version="1.0" encoding="UTF-8"?>
2619 <gpx creator="Garmin Connect" version="1.1"
2620 xmlns="http://www.topografix.com/GPX/1/1"
2621 xmlns:gpxtpx="http://www.garmin.com/xmlschemas/TrackPointExtension/v1"
2622 xmlns:gpxpx="http://www.garmin.com/xmlschemas/PowerExtension/v1">
2623 <trk>
2624 <trkseg>
2625 <trkpt lat="37.0" lon="-122.0">
2626 <ele>10.5</ele>
2627 <time>2025-01-03T07:00:00Z</time>
2628 <extensions>
2629 <gpxtpx:TrackPointExtension>
2630 <gpxtpx:hr>150</gpxtpx:hr>
2631 <gpxtpx:cad>88</gpxtpx:cad>
2632 <gpxtpx:speed>3.25</gpxtpx:speed>
2633 </gpxtpx:TrackPointExtension>
2634 <gpxpx:PowerExtension>
2635 <gpxpx:PowerInWatts>245</gpxpx:PowerInWatts>
2636 </gpxpx:PowerExtension>
2637 </extensions>
2638 </trkpt>
2639 <trkpt lat="37.1" lon="-122.1">
2640 <ele>11.0</ele>
2641 <time>2025-01-03T07:00:01Z</time>
2642 <extensions>
2643 <gpxtpx:TrackPointExtension>
2644 <gpxtpx:hr>151</gpxtpx:hr>
2645 </gpxtpx:TrackPointExtension>
2646 </extensions>
2647 </trkpt>
2648 </trkseg>
2649 </trk>
2650 </gpx>
2651 "#;
2652
2653 let points = parse_gpx(42, gpx_data).unwrap();
2654
2655 assert_eq!(points.len(), 2);
2656 assert_eq!(points[0].activity_id, 42);
2657 assert_eq!(points[0].heart_rate, Some(150));
2658 assert_eq!(points[0].cadence, Some(88));
2659 assert_eq!(points[0].power, Some(245));
2660 assert_eq!(points[0].speed, Some(3.25));
2661 assert_eq!(points[1].heart_rate, Some(151));
2662 assert_eq!(points[1].cadence, None);
2663 assert_eq!(points[1].power, None);
2664 assert_eq!(points[1].speed, None);
2665 }
2666
2667 #[tokio::test]
2668 async fn test_performance_uses_date_scoped_endpoints() {
2669 let server = MockServer::start().await;
2670 let date = NaiveDate::from_ymd_opt(2025, 12, 10).unwrap();
2671
2672 let vo2_fixture: serde_json::Value =
2673 serde_json::from_str(include_str!("../../tests/fixtures/vo2max.json")).unwrap();
2674 Mock::given(method("GET"))
2675 .and(path(
2676 "/metrics-service/metrics/maxmet/daily/2025-12-10/2025-12-10",
2677 ))
2678 .respond_with(ResponseTemplate::new(200).set_body_json(vo2_fixture))
2679 .mount(&server)
2680 .await;
2681
2682 let race_fixture: serde_json::Value =
2683 serde_json::from_str(include_str!("../../tests/fixtures/race_predictions.json"))
2684 .unwrap();
2685 Mock::given(method("GET"))
2686 .and(path(
2687 "/metrics-service/metrics/racepredictions/daily/TestUser",
2688 ))
2689 .and(query_param("fromCalendarDate", "2025-12-10"))
2690 .and(query_param("toCalendarDate", "2025-12-10"))
2691 .respond_with(ResponseTemplate::new(200).set_body_json(race_fixture))
2692 .mount(&server)
2693 .await;
2694
2695 let readiness_fixture: serde_json::Value =
2696 serde_json::from_str(include_str!("../../tests/fixtures/training_readiness.json"))
2697 .unwrap();
2698 Mock::given(method("GET"))
2699 .and(path(
2700 "/metrics-service/metrics/trainingreadiness/2025-12-10",
2701 ))
2702 .respond_with(ResponseTemplate::new(200).set_body_json(readiness_fixture))
2703 .mount(&server)
2704 .await;
2705
2706 let training_status_fixture = json!({
2707 "mostRecentTrainingStatus": {
2708 "latestTrainingStatusData": {
2709 "123": {
2710 "calendarDate": "2025-12-10",
2711 "trainingStatusFeedbackPhrase": "PRODUCTIVE"
2712 }
2713 }
2714 }
2715 });
2716 Mock::given(method("GET"))
2717 .and(path(
2718 "/metrics-service/metrics/trainingstatus/aggregated/2025-12-10",
2719 ))
2720 .respond_with(ResponseTemplate::new(200).set_body_json(training_status_fixture))
2721 .mount(&server)
2722 .await;
2723
2724 let endurance_fixture: serde_json::Value =
2725 serde_json::from_str(include_str!("../../tests/fixtures/endurance_score.json"))
2726 .unwrap();
2727 Mock::given(method("GET"))
2728 .and(path("/metrics-service/metrics/endurancescore"))
2729 .and(query_param("calendarDate", "2025-12-10"))
2730 .respond_with(ResponseTemplate::new(200).set_body_json(endurance_fixture))
2731 .mount(&server)
2732 .await;
2733
2734 let hill_fixture: serde_json::Value =
2735 serde_json::from_str(include_str!("../../tests/fixtures/hill_score.json")).unwrap();
2736 Mock::given(method("GET"))
2737 .and(path("/metrics-service/metrics/hillscore"))
2738 .and(query_param("calendarDate", "2025-12-10"))
2739 .respond_with(ResponseTemplate::new(200).set_body_json(hill_fixture))
2740 .mount(&server)
2741 .await;
2742
2743 let fitness_age_fixture = json!({
2744 "calendarDate": "2025-12-10",
2745 "fitnessAge": 37.0
2746 });
2747 Mock::given(method("GET"))
2748 .and(path("/fitnessage-service/fitnessage/2025-12-10"))
2749 .respond_with(ResponseTemplate::new(200).set_body_json(fitness_age_fixture))
2750 .mount(&server)
2751 .await;
2752
2753 let client = GarminClient::new_with_base_url(&server.uri());
2754 let mut task = SyncTask::new(
2755 1,
2756 SyncPipeline::Backfill,
2757 SyncTaskType::Performance { date },
2758 );
2759 task.id = Some(1);
2760
2761 let data = fetch_task_data(&task, &client, &test_token(), "TestUser", 1)
2762 .await
2763 .unwrap();
2764
2765 match data {
2766 SyncData::Performance { record, .. } => {
2767 assert_eq!(record.vo2max, Some(53.0));
2768 assert_eq!(record.fitness_age, Some(37));
2769 assert_eq!(record.training_readiness, Some(69));
2770 assert_eq!(record.training_status.as_deref(), Some("PRODUCTIVE"));
2771 assert_eq!(record.race_5k_sec, Some(1245));
2772 assert_eq!(record.race_10k_sec, Some(2610));
2773 assert_eq!(record.race_half_sec, Some(5850));
2774 assert_eq!(record.race_marathon_sec, Some(12420));
2775 assert_eq!(record.endurance_score, Some(72));
2776 assert_eq!(record.hill_score, Some(58));
2777 }
2778 _ => panic!("unexpected data type"),
2779 }
2780 }
2781
2782 #[test]
2783 fn test_parse_garmin_datetime_accepts_naive_strings() {
2784 let dt = parse_garmin_datetime("2025-01-03 07:00:00");
2785 assert!(dt.is_some());
2786
2787 let dt = parse_garmin_datetime("2025-01-03 07:00:00.123");
2788 assert!(dt.is_some());
2789 }
2790}