1pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use crate::{Database, GarminError, Result};
22use std::io::{self, Write};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28pub struct SyncEngine {
30 db: Database,
31 client: GarminClient,
32 token: OAuth2Token,
33 rate_limiter: RateLimiter,
34 queue: TaskQueue,
35 profile_id: i32,
36 display_name: Option<String>,
37}
38
39impl SyncEngine {
40 pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42 let queue = TaskQueue::new(db.clone());
43 Self {
44 db,
45 client,
46 token,
47 rate_limiter: RateLimiter::new(),
48 queue,
49 profile_id,
50 display_name: None,
51 }
52 }
53
54 async fn get_display_name(&mut self) -> Result<String> {
56 if let Some(ref name) = self.display_name {
57 return Ok(name.clone());
58 }
59
60 let profile: serde_json::Value = self
61 .client
62 .get_json(&self.token, "/userprofile-service/socialProfile")
63 .await?;
64
65 let name = profile
66 .get("displayName")
67 .and_then(|v| v.as_str())
68 .map(|s| s.to_string())
69 .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71 self.display_name = Some(name.clone());
72 Ok(name)
73 }
74
75 async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77 print!("Finding oldest activity date...");
78 let _ = io::stdout().flush();
79
80 self.rate_limiter.wait().await;
84
85 let limit: u32 = 100;
87 let mut jump: u32 = 100;
88 let mut last_non_empty: u32 = 0;
89
90 while jump < 10000 {
92 let path = format!(
93 "/activitylist-service/activities/search/activities?limit=1&start={}",
94 jump
95 );
96
97 let activities: Vec<serde_json::Value> =
98 self.client.get_json(&self.token, &path).await?;
99
100 if activities.is_empty() {
101 break;
102 }
103
104 last_non_empty = jump;
105 jump *= 2;
106 self.rate_limiter.wait().await;
107 }
108
109 let mut low = last_non_empty;
111 let mut high = jump;
112
113 while high - low > limit {
114 let mid = (low + high) / 2;
115 let path = format!(
116 "/activitylist-service/activities/search/activities?limit=1&start={}",
117 mid
118 );
119
120 self.rate_limiter.wait().await;
121 let activities: Vec<serde_json::Value> =
122 self.client.get_json(&self.token, &path).await?;
123
124 if activities.is_empty() {
125 high = mid;
126 } else {
127 low = mid;
128 }
129 }
130
131 let path = format!(
133 "/activitylist-service/activities/search/activities?limit={}&start={}",
134 limit, low
135 );
136
137 self.rate_limiter.wait().await;
138 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
139
140 let oldest_date = activities
141 .last()
142 .and_then(|activity| activity.get("startTimeLocal"))
143 .and_then(|v| v.as_str())
144 .and_then(|date_str| date_str.split(' ').next())
145 .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
146
147 let result = oldest_date.unwrap_or_else(|| {
148 Utc::now().date_naive() - Duration::days(365)
150 });
151
152 println!(" {}", result);
153 Ok(result)
154 }
155
156 pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
158 if opts.fancy_ui {
161 self.run_with_progress(&opts).await
162 } else {
163 self.run_sequential(&opts).await
164 }
165 }
166
167 async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
169 let progress = Arc::new(SyncProgress::new());
170
171 let display_name = self.get_display_name().await?;
173 progress.set_profile(&display_name);
174
175 let _recovered = self.queue.recover_in_progress()?;
177
178 if self.queue.pending_count()? == 0 {
180 self.plan_sync(opts).await?;
181 }
182
183 self.count_tasks_for_progress(&progress)?;
185
186 let from_date = opts
188 .from_date
189 .unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
190 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
191 progress.set_date_range(&from_date.to_string(), &to_date.to_string());
192
193 let ui_progress = progress.clone();
195 let ui_handle = tokio::spawn(async move {
196 if let Err(e) = ui::run_tui(ui_progress).await {
197 eprintln!("TUI error: {}", e);
198 }
199 });
200
201 let stats = self
203 .run_with_progress_tracking(opts, progress.clone())
204 .await?;
205
206 ui_handle.abort();
208
209 println!("\nSync complete: {}", stats);
211
212 Ok(stats)
213 }
214
215 async fn run_with_progress_tracking(
217 &mut self,
218 opts: &SyncOptions,
219 progress: SharedProgress,
220 ) -> Result<SyncStats> {
221 let mut stats = SyncStats::default();
222
223 while let Some(task) = self.queue.pop()? {
225 if self.rate_limiter.should_pause() {
226 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
227 }
228
229 let task_id = task.id.unwrap();
230 self.queue.mark_in_progress(task_id)?;
231
232 update_progress_for_task(&task, &progress);
234
235 self.rate_limiter.wait().await;
236 progress.record_request();
237
238 match self.execute_task(&task).await {
239 Ok(()) => {
240 self.queue.mark_completed(task_id)?;
241 self.rate_limiter.on_success();
242 stats.completed += 1;
243 complete_progress_for_task(&task, &progress);
244 }
245 Err(GarminError::RateLimited) => {
246 self.rate_limiter.on_rate_limit();
247 let backoff = self.rate_limiter.current_backoff();
248 self.queue.mark_failed(
249 task_id,
250 "Rate limited",
251 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
252 )?;
253 stats.rate_limited += 1;
254 }
255 Err(e) => {
256 let backoff = Duration::seconds(60);
257 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
258 stats.failed += 1;
259 fail_progress_for_task(&task, &progress);
260 }
261 }
262
263 if opts.dry_run {
264 break;
265 }
266 }
267
268 self.queue.cleanup(7)?;
270
271 Ok(stats)
272 }
273
274 async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
276 let mut stats = SyncStats::default();
277
278 print!("Fetching user profile...");
280 let _ = io::stdout().flush();
281 let display_name = self.get_display_name().await?;
282 println!(" {}", display_name);
283
284 let recovered = self.queue.recover_in_progress()?;
286 if recovered > 0 {
287 println!(" Recovered {} tasks from previous run", recovered);
288 stats.recovered = recovered;
289 }
290
291 if self.queue.pending_count()? == 0 {
293 println!("Planning sync tasks...");
294 self.plan_sync(opts).await?;
295 }
296
297 let total_tasks = self.queue.pending_count()?;
298 println!(" {} tasks queued\n", total_tasks);
299
300 while let Some(task) = self.queue.pop()? {
302 if self.rate_limiter.should_pause() {
303 println!(
304 " Rate limited, pausing for {} seconds...",
305 self.rate_limiter.pause_duration().as_secs()
306 );
307 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
308 }
309
310 let task_id = task.id.unwrap();
311 self.queue.mark_in_progress(task_id)?;
312
313 print_task_status(&task, &stats);
315
316 self.rate_limiter.wait().await;
317
318 match self.execute_task(&task).await {
319 Ok(()) => {
320 self.queue.mark_completed(task_id)?;
321 self.rate_limiter.on_success();
322 stats.completed += 1;
323 println!(" done");
324 }
325 Err(GarminError::RateLimited) => {
326 self.rate_limiter.on_rate_limit();
327 let backoff = self.rate_limiter.current_backoff();
328 self.queue.mark_failed(
329 task_id,
330 "Rate limited",
331 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
332 )?;
333 stats.rate_limited += 1;
334 println!(" rate limited (retry in {}s)", backoff.as_secs());
335 }
336 Err(e) => {
337 let backoff = Duration::seconds(60);
338 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
339 stats.failed += 1;
340 println!(" failed: {}", e);
341 }
342 }
343
344 if opts.dry_run {
345 break;
346 }
347 }
348
349 self.queue.cleanup(7)?;
351
352 Ok(stats)
353 }
354
355 fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
357 let conn = self.db.connection();
358 let conn = conn.lock().unwrap();
359
360 let act_count: i64 = conn
362 .query_row(
363 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
364 [],
365 |row| row.get(0),
366 )
367 .unwrap_or(0);
368 progress.activities.set_total(act_count as u32);
369
370 let gpx_count: i64 = conn
372 .query_row(
373 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
374 [],
375 |row| row.get(0),
376 )
377 .unwrap_or(0);
378 progress.gpx.set_total(gpx_count as u32);
379
380 let health_count: i64 = conn
382 .query_row(
383 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
384 [],
385 |row| row.get(0),
386 )
387 .unwrap_or(0);
388 progress.health.set_total(health_count as u32);
389
390 let perf_count: i64 = conn
392 .query_row(
393 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
394 [],
395 |row| row.get(0),
396 )
397 .unwrap_or(0);
398 progress.performance.set_total(perf_count as u32);
399
400 Ok(())
401 }
402
403 async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
405 let from_date = match opts.from_date {
407 Some(date) => date,
408 None => self.find_oldest_activity_date().await?,
409 };
410 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
411
412 let total_days = (to_date - from_date).num_days();
413 println!(
414 " Date range: {} to {} ({} days)",
415 from_date, to_date, total_days
416 );
417
418 if opts.sync_activities {
420 println!(" Planning activity sync...");
421 self.plan_activities_sync()?;
422 }
423
424 if opts.sync_health {
426 let health_tasks = self.plan_health_sync(from_date, to_date)?;
427 println!(" Planning health sync: {} days to fetch", health_tasks);
428 }
429
430 if opts.sync_performance {
432 let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
433 println!(" Planning performance sync: {} weeks to fetch", perf_tasks);
434 }
435
436 Ok(())
437 }
438
439 fn plan_activities_sync(&self) -> Result<()> {
441 let task = SyncTask::new(
443 self.profile_id,
444 SyncTaskType::Activities {
445 start: 0,
446 limit: 50,
447 },
448 );
449 self.queue.push(task)?;
450 Ok(())
451 }
452
453 fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
455 let mut count = 0;
456 let mut date = from;
457 while date <= to {
458 if !self.has_health_data(date)? {
460 let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
461 self.queue.push(task)?;
462 count += 1;
463 }
464 date += Duration::days(1);
465 }
466 Ok(count)
467 }
468
469 fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
471 let mut count = 0;
473 let mut date = from;
474 while date <= to {
475 if !self.has_performance_data(date)? {
476 let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
477 self.queue.push(task)?;
478 count += 1;
479 }
480 date += Duration::days(7);
481 }
482 Ok(count)
483 }
484
485 fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
487 let conn = self.db.connection();
488 let conn = conn.lock().unwrap();
489
490 let count: i64 = conn
491 .query_row(
492 "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
493 duckdb::params![self.profile_id, date.to_string()],
494 |row| row.get(0),
495 )
496 .map_err(|e| GarminError::Database(e.to_string()))?;
497
498 Ok(count > 0)
499 }
500
501 fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
503 let conn = self.db.connection();
504 let conn = conn.lock().unwrap();
505
506 let count: i64 = conn
507 .query_row(
508 "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
509 duckdb::params![self.profile_id, date.to_string()],
510 |row| row.get(0),
511 )
512 .map_err(|e| GarminError::Database(e.to_string()))?;
513
514 Ok(count > 0)
515 }
516
517 async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
519 match &task.task_type {
520 SyncTaskType::Activities { start, limit } => self.sync_activities(*start, *limit).await,
521 SyncTaskType::ActivityDetail { activity_id } => {
522 self.sync_activity_detail(*activity_id).await
523 }
524 SyncTaskType::DownloadGpx { activity_id, .. } => self.download_gpx(*activity_id).await,
525 SyncTaskType::DailyHealth { date } => self.sync_daily_health(*date).await,
526 SyncTaskType::Performance { date } => self.sync_performance(*date).await,
527 SyncTaskType::Weight { from, to } => self.sync_weight(*from, *to).await,
528 SyncTaskType::GenerateEmbeddings { activity_ids } => {
529 self.generate_embeddings(activity_ids).await
530 }
531 }
532 }
533
534 async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
536 let path = format!(
537 "/activitylist-service/activities/search/activities?limit={}&start={}",
538 limit, start
539 );
540 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
541
542 for activity in &activities {
543 self.store_activity(activity)?;
545
546 if activity
548 .get("hasPolyline")
549 .and_then(|v| v.as_bool())
550 .unwrap_or(false)
551 {
552 if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
553 let activity_name = activity
554 .get("activityName")
555 .and_then(|v| v.as_str())
556 .map(|s| s.to_string());
557 let activity_date = activity
558 .get("startTimeLocal")
559 .and_then(|v| v.as_str())
560 .and_then(|s| s.split(' ').next())
561 .map(|s| s.to_string());
562
563 let task = SyncTask::new(
564 self.profile_id,
565 SyncTaskType::DownloadGpx {
566 activity_id: id,
567 activity_name,
568 activity_date,
569 },
570 );
571 self.queue.push(task)?;
572 }
573 }
574 }
575
576 if activities.len() == limit as usize {
578 let task = SyncTask::new(
579 self.profile_id,
580 SyncTaskType::Activities {
581 start: start + limit,
582 limit,
583 },
584 );
585 self.queue.push(task)?;
586 }
587
588 Ok(())
589 }
590
591 fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
593 let conn = self.db.connection();
594 let conn = conn.lock().unwrap();
595
596 let activity_id = activity
597 .get("activityId")
598 .and_then(|v| v.as_i64())
599 .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
600
601 conn.execute(
602 "INSERT INTO activities (
603 activity_id, profile_id, activity_name, activity_type,
604 start_time_local, duration_sec, distance_m, calories,
605 avg_hr, max_hr, avg_speed, max_speed,
606 elevation_gain, elevation_loss, raw_json
607 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
608 ON CONFLICT (activity_id) DO UPDATE SET
609 activity_name = EXCLUDED.activity_name,
610 activity_type = EXCLUDED.activity_type,
611 raw_json = EXCLUDED.raw_json",
612 duckdb::params![
613 activity_id,
614 self.profile_id,
615 activity.get("activityName").and_then(|v| v.as_str()),
616 activity
617 .get("activityType")
618 .and_then(|v| v.get("typeKey"))
619 .and_then(|v| v.as_str()),
620 activity.get("startTimeLocal").and_then(|v| v.as_str()),
621 activity.get("duration").and_then(|v| v.as_f64()),
622 activity.get("distance").and_then(|v| v.as_f64()),
623 activity
624 .get("calories")
625 .and_then(|v| v.as_i64())
626 .map(|v| v as i32),
627 activity
628 .get("averageHR")
629 .and_then(|v| v.as_i64())
630 .map(|v| v as i32),
631 activity
632 .get("maxHR")
633 .and_then(|v| v.as_i64())
634 .map(|v| v as i32),
635 activity.get("averageSpeed").and_then(|v| v.as_f64()),
636 activity.get("maxSpeed").and_then(|v| v.as_f64()),
637 activity.get("elevationGain").and_then(|v| v.as_f64()),
638 activity.get("elevationLoss").and_then(|v| v.as_f64()),
639 activity.to_string(),
640 ],
641 )
642 .map_err(|e| GarminError::Database(e.to_string()))?;
643
644 Ok(())
645 }
646
647 async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
649 Ok(())
651 }
652
653 async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
655 let path = format!("/download-service/export/gpx/activity/{}", activity_id);
656 let gpx_bytes = self.client.download(&self.token, &path).await?;
657 let gpx_data = String::from_utf8_lossy(&gpx_bytes);
658 self.parse_and_store_gpx(activity_id, &gpx_data)?;
659 Ok(())
660 }
661
662 fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
664 use gpx::read;
665 use std::io::BufReader;
666
667 let reader = BufReader::new(gpx_data.as_bytes());
668 let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
669
670 let conn = self.db.connection();
671 let conn = conn.lock().unwrap();
672
673 for track in gpx.tracks {
674 for segment in track.segments {
675 for point in segment.points {
676 let timestamp = point
677 .time
678 .map(|t| t.format().unwrap_or_default())
679 .unwrap_or_default();
680
681 conn.execute(
682 "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
683 VALUES (?, ?, ?, ?, ?)",
684 duckdb::params![
685 activity_id,
686 timestamp,
687 point.point().y(),
688 point.point().x(),
689 point.elevation,
690 ],
691 )
692 .map_err(|e| GarminError::Database(e.to_string()))?;
693 }
694 }
695 }
696
697 Ok(())
698 }
699
700 async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
702 let display_name = self.get_display_name().await?;
704
705 let path = format!(
706 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
707 display_name, date
708 );
709
710 let health_result: std::result::Result<serde_json::Value, _> =
712 self.client.get_json(&self.token, &path).await;
713
714 let health = match health_result {
715 Ok(data) => data,
716 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
717 serde_json::json!({})
720 }
721 Err(e) => return Err(e),
722 };
723
724 let conn = self.db.connection();
725 let conn = conn.lock().unwrap();
726
727 conn.execute(
728 "INSERT INTO daily_health (
729 profile_id, date, steps, step_goal, total_calories, active_calories,
730 resting_hr, sleep_seconds, avg_stress, max_stress,
731 body_battery_start, body_battery_end, raw_json
732 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
733 ON CONFLICT (profile_id, date) DO UPDATE SET
734 steps = EXCLUDED.steps,
735 step_goal = EXCLUDED.step_goal,
736 total_calories = EXCLUDED.total_calories,
737 active_calories = EXCLUDED.active_calories,
738 resting_hr = EXCLUDED.resting_hr,
739 sleep_seconds = EXCLUDED.sleep_seconds,
740 avg_stress = EXCLUDED.avg_stress,
741 max_stress = EXCLUDED.max_stress,
742 body_battery_start = EXCLUDED.body_battery_start,
743 body_battery_end = EXCLUDED.body_battery_end,
744 raw_json = EXCLUDED.raw_json",
745 duckdb::params![
746 self.profile_id,
747 date.to_string(),
748 health
749 .get("totalSteps")
750 .and_then(|v| v.as_i64())
751 .map(|v| v as i32),
752 health
753 .get("dailyStepGoal")
754 .and_then(|v| v.as_i64())
755 .map(|v| v as i32),
756 health
757 .get("totalKilocalories")
758 .and_then(|v| v.as_i64())
759 .map(|v| v as i32),
760 health
761 .get("activeKilocalories")
762 .and_then(|v| v.as_i64())
763 .map(|v| v as i32),
764 health
765 .get("restingHeartRate")
766 .and_then(|v| v.as_i64())
767 .map(|v| v as i32),
768 health
769 .get("sleepingSeconds")
770 .and_then(|v| v.as_i64())
771 .map(|v| v as i32),
772 health
773 .get("averageStressLevel")
774 .and_then(|v| v.as_i64())
775 .map(|v| v as i32),
776 health
777 .get("maxStressLevel")
778 .and_then(|v| v.as_i64())
779 .map(|v| v as i32),
780 health
781 .get("bodyBatteryChargedValue")
782 .and_then(|v| v.as_i64())
783 .map(|v| v as i32),
784 health
785 .get("bodyBatteryDrainedValue")
786 .and_then(|v| v.as_i64())
787 .map(|v| v as i32),
788 health.to_string(),
789 ],
790 )
791 .map_err(|e| GarminError::Database(e.to_string()))?;
792
793 Ok(())
794 }
795
796 async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
798 let vo2max: Option<serde_json::Value> = self
800 .client
801 .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
802 .await
803 .ok();
804
805 let race_predictions: Option<serde_json::Value> = self
807 .client
808 .get_json(
809 &self.token,
810 "/metrics-service/metrics/racepredictions/latest",
811 )
812 .await
813 .ok();
814
815 let readiness_path = format!("/metrics-service/metrics/trainingreadiness/{}", date);
817 let training_readiness: Option<serde_json::Value> = self
818 .client
819 .get_json(&self.token, &readiness_path)
820 .await
821 .ok();
822
823 let status_path = format!(
825 "/metrics-service/metrics/trainingstatus/aggregated/{}",
826 date
827 );
828 let training_status: Option<serde_json::Value> =
829 self.client.get_json(&self.token, &status_path).await.ok();
830
831 let conn = self.db.connection();
832 let conn = conn.lock().unwrap();
833
834 let vo2max_value = vo2max
835 .as_ref()
836 .and_then(|v| v.get("generic"))
837 .and_then(|v| v.get("vo2MaxValue"))
838 .and_then(|v| v.as_f64());
839
840 let fitness_age = vo2max
841 .as_ref()
842 .and_then(|v| v.get("generic"))
843 .and_then(|v| v.get("fitnessAge"))
844 .and_then(|v| v.as_i64())
845 .map(|v| v as i32);
846
847 let readiness_entry = training_readiness
849 .as_ref()
850 .and_then(|v| v.as_array())
851 .and_then(|arr| arr.first());
852
853 let readiness_score = readiness_entry
854 .and_then(|e| e.get("score"))
855 .and_then(|v| v.as_i64())
856 .map(|v| v as i32);
857
858 let readiness_level = readiness_entry
859 .and_then(|e| e.get("level"))
860 .and_then(|v| v.as_str())
861 .map(|s| s.to_string());
862
863 let status_data = training_status
865 .as_ref()
866 .and_then(|v| v.get("mostRecentTrainingStatus"))
867 .and_then(|s| s.get("latestTrainingStatusData"))
868 .and_then(|d| d.as_object())
869 .and_then(|m| m.values().next());
870
871 let status_phrase = status_data
872 .and_then(|e| e.get("trainingStatusFeedbackPhrase"))
873 .and_then(|v| v.as_str())
874 .map(|s| s.to_string());
875
876 let load_dto = status_data.and_then(|e| e.get("acuteTrainingLoadDTO"));
877
878 let acute_load = load_dto
879 .and_then(|l| l.get("dailyTrainingLoadAcute"))
880 .and_then(|v| v.as_f64());
881
882 let chronic_load = load_dto
883 .and_then(|l| l.get("dailyTrainingLoadChronic"))
884 .and_then(|v| v.as_f64());
885
886 let load_ratio = load_dto
887 .and_then(|l| l.get("dailyAcuteChronicWorkloadRatio"))
888 .and_then(|v| v.as_f64());
889
890 let load_ratio_status = load_dto
891 .and_then(|l| l.get("acwrStatus"))
892 .and_then(|v| v.as_str())
893 .map(|s| s.to_string());
894
895 let load_focus = training_status
897 .as_ref()
898 .and_then(|v| v.get("mostRecentTrainingLoadBalance"))
899 .and_then(|b| b.get("metricsTrainingLoadBalanceDTOMap"))
900 .and_then(|m| m.as_object())
901 .and_then(|m| m.values().next())
902 .and_then(|e| e.get("trainingBalanceFeedbackPhrase"))
903 .and_then(|v| v.as_str())
904 .map(|s| s.to_string());
905
906 let race_5k = race_predictions
907 .as_ref()
908 .and_then(|v| v.get("time5K"))
909 .and_then(|v| v.as_f64())
910 .map(|v| v as i32);
911
912 let race_10k = race_predictions
913 .as_ref()
914 .and_then(|v| v.get("time10K"))
915 .and_then(|v| v.as_f64())
916 .map(|v| v as i32);
917
918 let race_half = race_predictions
919 .as_ref()
920 .and_then(|v| v.get("timeHalfMarathon"))
921 .and_then(|v| v.as_f64())
922 .map(|v| v as i32);
923
924 let race_marathon = race_predictions
925 .as_ref()
926 .and_then(|v| v.get("timeMarathon"))
927 .and_then(|v| v.as_f64())
928 .map(|v| v as i32);
929
930 conn.execute(
931 "INSERT INTO performance_metrics (
932 profile_id, date, vo2max, fitness_age,
933 training_readiness, training_readiness_level,
934 training_status, acute_load, chronic_load,
935 load_ratio, load_ratio_status, load_focus,
936 race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
937 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
938 ON CONFLICT (profile_id, date) DO UPDATE SET
939 vo2max = EXCLUDED.vo2max,
940 fitness_age = EXCLUDED.fitness_age,
941 training_readiness = EXCLUDED.training_readiness,
942 training_readiness_level = EXCLUDED.training_readiness_level,
943 training_status = EXCLUDED.training_status,
944 acute_load = EXCLUDED.acute_load,
945 chronic_load = EXCLUDED.chronic_load,
946 load_ratio = EXCLUDED.load_ratio,
947 load_ratio_status = EXCLUDED.load_ratio_status,
948 load_focus = EXCLUDED.load_focus,
949 race_5k_sec = EXCLUDED.race_5k_sec,
950 race_10k_sec = EXCLUDED.race_10k_sec,
951 race_half_sec = EXCLUDED.race_half_sec,
952 race_marathon_sec = EXCLUDED.race_marathon_sec",
953 duckdb::params![
954 self.profile_id,
955 date.to_string(),
956 vo2max_value,
957 fitness_age,
958 readiness_score,
959 readiness_level,
960 status_phrase,
961 acute_load,
962 chronic_load,
963 load_ratio,
964 load_ratio_status,
965 load_focus,
966 race_5k,
967 race_10k,
968 race_half,
969 race_marathon,
970 ],
971 )
972 .map_err(|e| GarminError::Database(e.to_string()))?;
973
974 Ok(())
975 }
976
977 async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
979 Ok(())
981 }
982
983 async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
985 Ok(())
987 }
988}
989
990#[derive(Default)]
992pub struct SyncOptions {
993 pub sync_activities: bool,
995 pub sync_health: bool,
997 pub sync_performance: bool,
999 pub from_date: Option<NaiveDate>,
1001 pub to_date: Option<NaiveDate>,
1003 pub dry_run: bool,
1005 pub force: bool,
1007 pub fancy_ui: bool,
1009 pub concurrency: usize,
1011}
1012
1013impl SyncOptions {
1014 pub fn full() -> Self {
1016 Self {
1017 sync_activities: true,
1018 sync_health: true,
1019 sync_performance: true,
1020 fancy_ui: true,
1021 concurrency: 3,
1022 ..Default::default()
1023 }
1024 }
1025
1026 pub fn simple() -> Self {
1028 Self {
1029 sync_activities: true,
1030 sync_health: true,
1031 sync_performance: true,
1032 fancy_ui: false,
1033 concurrency: 3,
1034 ..Default::default()
1035 }
1036 }
1037}
1038
1039#[derive(Default)]
1041pub struct SyncStats {
1042 pub recovered: u32,
1044 pub completed: u32,
1046 pub rate_limited: u32,
1048 pub failed: u32,
1050}
1051
1052impl std::fmt::Display for SyncStats {
1053 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054 write!(
1055 f,
1056 "Completed: {}, Failed: {}, Rate limited: {}",
1057 self.completed, self.failed, self.rate_limited
1058 )?;
1059 if self.recovered > 0 {
1060 write!(f, ", Recovered: {}", self.recovered)?;
1061 }
1062 Ok(())
1063 }
1064}
1065
1066fn print_task_status(task: &SyncTask, stats: &SyncStats) {
1068 let desc = match &task.task_type {
1069 SyncTaskType::Activities { start, limit } => {
1070 format!("Fetching activities {}-{}", start, start + limit)
1071 }
1072 SyncTaskType::ActivityDetail { activity_id } => {
1073 format!("Activity {} details", activity_id)
1074 }
1075 SyncTaskType::DownloadGpx {
1076 activity_id,
1077 activity_name,
1078 activity_date,
1079 } => {
1080 let name = activity_name.as_deref().unwrap_or("Unknown");
1081 let date = activity_date.as_deref().unwrap_or("");
1082 if date.is_empty() {
1083 format!("GPX: {} ({})", name, activity_id)
1084 } else {
1085 format!("GPX: {} {} ({})", date, name, activity_id)
1086 }
1087 }
1088 SyncTaskType::DailyHealth { date } => {
1089 format!("Health data for {}", date)
1090 }
1091 SyncTaskType::Performance { date } => {
1092 format!("Performance metrics for {}", date)
1093 }
1094 SyncTaskType::Weight { from, to } => {
1095 format!("Weight {} to {}", from, to)
1096 }
1097 SyncTaskType::GenerateEmbeddings { activity_ids } => {
1098 format!(
1099 "Generating embeddings for {} activities",
1100 activity_ids.len()
1101 )
1102 }
1103 };
1104
1105 print!("[{:>4}] {}...", stats.completed + 1, desc);
1106 let _ = io::stdout().flush();
1107}
1108
1109fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1111 let desc = match &task.task_type {
1112 SyncTaskType::Activities { start, limit } => {
1113 format!("Activities {}-{}", start, start + limit)
1114 }
1115 SyncTaskType::DownloadGpx {
1116 activity_name,
1117 activity_date,
1118 ..
1119 } => {
1120 let name = activity_name.as_deref().unwrap_or("Unknown");
1121 let date = activity_date.as_deref().unwrap_or("");
1122 if date.is_empty() {
1123 name.to_string()
1124 } else {
1125 format!("{} {}", date, name)
1126 }
1127 }
1128 SyncTaskType::DailyHealth { date } => date.to_string(),
1129 SyncTaskType::Performance { date } => date.to_string(),
1130 _ => String::new(),
1131 };
1132
1133 match &task.task_type {
1134 SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
1135 SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
1136 SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
1137 SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
1138 _ => {}
1139 }
1140}
1141
1142fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1144 match &task.task_type {
1145 SyncTaskType::Activities { .. } => {
1146 progress.activities.complete_one();
1147 }
1150 SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1151 SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1152 SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1153 _ => {}
1154 }
1155}
1156
1157fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1159 match &task.task_type {
1160 SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1161 SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1162 SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1163 SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1164 _ => {}
1165 }
1166}