1pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use crate::{Database, GarminError, Result};
22use std::io::{self, Write};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28pub struct SyncEngine {
30 db: Database,
31 client: GarminClient,
32 token: OAuth2Token,
33 rate_limiter: RateLimiter,
34 queue: TaskQueue,
35 profile_id: i32,
36 display_name: Option<String>,
37}
38
39impl SyncEngine {
40 pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42 let queue = TaskQueue::new(db.clone());
43 Self {
44 db,
45 client,
46 token,
47 rate_limiter: RateLimiter::new(),
48 queue,
49 profile_id,
50 display_name: None,
51 }
52 }
53
54 async fn get_display_name(&mut self) -> Result<String> {
56 if let Some(ref name) = self.display_name {
57 return Ok(name.clone());
58 }
59
60 let profile: serde_json::Value = self
61 .client
62 .get_json(&self.token, "/userprofile-service/socialProfile")
63 .await?;
64
65 let name = profile
66 .get("displayName")
67 .and_then(|v| v.as_str())
68 .map(|s| s.to_string())
69 .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71 self.display_name = Some(name.clone());
72 Ok(name)
73 }
74
75 async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77 print!("Finding oldest activity date...");
78 let _ = io::stdout().flush();
79
80 self.rate_limiter.wait().await;
84
85 let limit: u32 = 100;
87 let mut jump: u32 = 100;
88 let mut last_non_empty: u32 = 0;
89
90 while jump < 10000 {
92 let path = format!(
93 "/activitylist-service/activities/search/activities?limit=1&start={}",
94 jump
95 );
96
97 let activities: Vec<serde_json::Value> =
98 self.client.get_json(&self.token, &path).await?;
99
100 if activities.is_empty() {
101 break;
102 }
103
104 last_non_empty = jump;
105 jump *= 2;
106 self.rate_limiter.wait().await;
107 }
108
109 let mut low = last_non_empty;
111 let mut high = jump;
112
113 while high - low > limit {
114 let mid = (low + high) / 2;
115 let path = format!(
116 "/activitylist-service/activities/search/activities?limit=1&start={}",
117 mid
118 );
119
120 self.rate_limiter.wait().await;
121 let activities: Vec<serde_json::Value> =
122 self.client.get_json(&self.token, &path).await?;
123
124 if activities.is_empty() {
125 high = mid;
126 } else {
127 low = mid;
128 }
129 }
130
131 let path = format!(
133 "/activitylist-service/activities/search/activities?limit={}&start={}",
134 limit, low
135 );
136
137 self.rate_limiter.wait().await;
138 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
139
140 let oldest_date = activities
141 .last()
142 .and_then(|activity| activity.get("startTimeLocal"))
143 .and_then(|v| v.as_str())
144 .and_then(|date_str| date_str.split(' ').next())
145 .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
146
147 let result = oldest_date.unwrap_or_else(|| {
148 Utc::now().date_naive() - Duration::days(365)
150 });
151
152 println!(" {}", result);
153 Ok(result)
154 }
155
156 pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
158 if opts.fancy_ui {
161 self.run_with_progress(&opts).await
162 } else {
163 self.run_sequential(&opts).await
164 }
165 }
166
167 async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
169 let progress = Arc::new(SyncProgress::new());
170
171 let display_name = self.get_display_name().await?;
173 progress.set_profile(&display_name);
174
175 let _recovered = self.queue.recover_in_progress()?;
177
178 if self.queue.pending_count()? == 0 {
180 self.plan_sync(opts).await?;
181 }
182
183 self.count_tasks_for_progress(&progress)?;
185
186 let from_date = opts
188 .from_date
189 .unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
190 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
191 progress.set_date_range(&from_date.to_string(), &to_date.to_string());
192
193 let ui_progress = progress.clone();
195 let ui_handle = tokio::spawn(async move {
196 if let Err(e) = ui::run_tui(ui_progress).await {
197 eprintln!("TUI error: {}", e);
198 }
199 });
200
201 let stats = self
203 .run_with_progress_tracking(opts, progress.clone())
204 .await?;
205
206 ui_handle.abort();
208
209 println!("\nSync complete: {}", stats);
211
212 Ok(stats)
213 }
214
215 async fn run_with_progress_tracking(
217 &mut self,
218 opts: &SyncOptions,
219 progress: SharedProgress,
220 ) -> Result<SyncStats> {
221 let mut stats = SyncStats::default();
222
223 while let Some(task) = self.queue.pop()? {
225 if self.rate_limiter.should_pause() {
226 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
227 }
228
229 let task_id = task.id.unwrap();
230 self.queue.mark_in_progress(task_id)?;
231
232 update_progress_for_task(&task, &progress);
234
235 self.rate_limiter.wait().await;
236 progress.record_request();
237
238 match self.execute_task(&task).await {
239 Ok(()) => {
240 self.queue.mark_completed(task_id)?;
241 self.rate_limiter.on_success();
242 stats.completed += 1;
243 complete_progress_for_task(&task, &progress);
244 }
245 Err(GarminError::RateLimited) => {
246 self.rate_limiter.on_rate_limit();
247 let backoff = self.rate_limiter.current_backoff();
248 self.queue.mark_failed(
249 task_id,
250 "Rate limited",
251 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
252 )?;
253 stats.rate_limited += 1;
254 }
255 Err(e) => {
256 let backoff = Duration::seconds(60);
257 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
258 stats.failed += 1;
259 fail_progress_for_task(&task, &progress);
260 }
261 }
262
263 if opts.dry_run {
264 break;
265 }
266 }
267
268 self.queue.cleanup(7)?;
270
271 Ok(stats)
272 }
273
274 async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
276 let mut stats = SyncStats::default();
277
278 print!("Fetching user profile...");
280 let _ = io::stdout().flush();
281 let display_name = self.get_display_name().await?;
282 println!(" {}", display_name);
283
284 let recovered = self.queue.recover_in_progress()?;
286 if recovered > 0 {
287 println!(" Recovered {} tasks from previous run", recovered);
288 stats.recovered = recovered;
289 }
290
291 if self.queue.pending_count()? == 0 {
293 println!("Planning sync tasks...");
294 self.plan_sync(opts).await?;
295 }
296
297 let total_tasks = self.queue.pending_count()?;
298 println!(" {} tasks queued\n", total_tasks);
299
300 while let Some(task) = self.queue.pop()? {
302 if self.rate_limiter.should_pause() {
303 println!(
304 " Rate limited, pausing for {} seconds...",
305 self.rate_limiter.pause_duration().as_secs()
306 );
307 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
308 }
309
310 let task_id = task.id.unwrap();
311 self.queue.mark_in_progress(task_id)?;
312
313 print_task_status(&task, &stats);
315
316 self.rate_limiter.wait().await;
317
318 match self.execute_task(&task).await {
319 Ok(()) => {
320 self.queue.mark_completed(task_id)?;
321 self.rate_limiter.on_success();
322 stats.completed += 1;
323 println!(" done");
324 }
325 Err(GarminError::RateLimited) => {
326 self.rate_limiter.on_rate_limit();
327 let backoff = self.rate_limiter.current_backoff();
328 self.queue.mark_failed(
329 task_id,
330 "Rate limited",
331 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
332 )?;
333 stats.rate_limited += 1;
334 println!(" rate limited (retry in {}s)", backoff.as_secs());
335 }
336 Err(e) => {
337 let backoff = Duration::seconds(60);
338 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
339 stats.failed += 1;
340 println!(" failed: {}", e);
341 }
342 }
343
344 if opts.dry_run {
345 break;
346 }
347 }
348
349 self.queue.cleanup(7)?;
351
352 Ok(stats)
353 }
354
355 fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
357 let conn = self.db.connection();
358 let conn = conn.lock().unwrap();
359
360 let act_count: i64 = conn
362 .query_row(
363 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
364 [],
365 |row| row.get(0),
366 )
367 .unwrap_or(0);
368 progress.activities.set_total(act_count as u32);
369
370 let gpx_count: i64 = conn
372 .query_row(
373 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
374 [],
375 |row| row.get(0),
376 )
377 .unwrap_or(0);
378 progress.gpx.set_total(gpx_count as u32);
379
380 let health_count: i64 = conn
382 .query_row(
383 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
384 [],
385 |row| row.get(0),
386 )
387 .unwrap_or(0);
388 progress.health.set_total(health_count as u32);
389
390 let perf_count: i64 = conn
392 .query_row(
393 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
394 [],
395 |row| row.get(0),
396 )
397 .unwrap_or(0);
398 progress.performance.set_total(perf_count as u32);
399
400 Ok(())
401 }
402
403 async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
405 let from_date = match opts.from_date {
407 Some(date) => date,
408 None => self.find_oldest_activity_date().await?,
409 };
410 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
411
412 let total_days = (to_date - from_date).num_days();
413 println!(
414 " Date range: {} to {} ({} days)",
415 from_date, to_date, total_days
416 );
417
418 if opts.sync_activities {
420 println!(" Planning activity sync...");
421 self.plan_activities_sync()?;
422 }
423
424 if opts.sync_health {
426 let health_tasks = self.plan_health_sync(from_date, to_date)?;
427 println!(" Planning health sync: {} days to fetch", health_tasks);
428 }
429
430 if opts.sync_performance {
432 let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
433 println!(" Planning performance sync: {} weeks to fetch", perf_tasks);
434 }
435
436 Ok(())
437 }
438
439 fn plan_activities_sync(&self) -> Result<()> {
441 let task = SyncTask::new(
443 self.profile_id,
444 SyncTaskType::Activities {
445 start: 0,
446 limit: 50,
447 },
448 );
449 self.queue.push(task)?;
450 Ok(())
451 }
452
453 fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
455 let mut count = 0;
456 let mut date = from;
457 while date <= to {
458 if !self.has_health_data(date)? {
460 let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
461 self.queue.push(task)?;
462 count += 1;
463 }
464 date += Duration::days(1);
465 }
466 Ok(count)
467 }
468
469 fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
471 let mut count = 0;
473 let mut date = from;
474 while date <= to {
475 if !self.has_performance_data(date)? {
476 let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
477 self.queue.push(task)?;
478 count += 1;
479 }
480 date += Duration::days(7);
481 }
482 Ok(count)
483 }
484
485 fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
487 let conn = self.db.connection();
488 let conn = conn.lock().unwrap();
489
490 let count: i64 = conn
491 .query_row(
492 "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
493 duckdb::params![self.profile_id, date.to_string()],
494 |row| row.get(0),
495 )
496 .map_err(|e| GarminError::Database(e.to_string()))?;
497
498 Ok(count > 0)
499 }
500
501 fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
503 let conn = self.db.connection();
504 let conn = conn.lock().unwrap();
505
506 let count: i64 = conn
507 .query_row(
508 "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
509 duckdb::params![self.profile_id, date.to_string()],
510 |row| row.get(0),
511 )
512 .map_err(|e| GarminError::Database(e.to_string()))?;
513
514 Ok(count > 0)
515 }
516
517 async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
519 match &task.task_type {
520 SyncTaskType::Activities { start, limit } => self.sync_activities(*start, *limit).await,
521 SyncTaskType::ActivityDetail { activity_id } => {
522 self.sync_activity_detail(*activity_id).await
523 }
524 SyncTaskType::DownloadGpx { activity_id, .. } => self.download_gpx(*activity_id).await,
525 SyncTaskType::DailyHealth { date } => self.sync_daily_health(*date).await,
526 SyncTaskType::Performance { date } => self.sync_performance(*date).await,
527 SyncTaskType::Weight { from, to } => self.sync_weight(*from, *to).await,
528 SyncTaskType::GenerateEmbeddings { activity_ids } => {
529 self.generate_embeddings(activity_ids).await
530 }
531 }
532 }
533
534 async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
536 let path = format!(
537 "/activitylist-service/activities/search/activities?limit={}&start={}",
538 limit, start
539 );
540 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
541
542 for activity in &activities {
543 self.store_activity(activity)?;
545
546 if activity
548 .get("hasPolyline")
549 .and_then(|v| v.as_bool())
550 .unwrap_or(false)
551 {
552 if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
553 let activity_name = activity
554 .get("activityName")
555 .and_then(|v| v.as_str())
556 .map(|s| s.to_string());
557 let activity_date = activity
558 .get("startTimeLocal")
559 .and_then(|v| v.as_str())
560 .and_then(|s| s.split(' ').next())
561 .map(|s| s.to_string());
562
563 let task = SyncTask::new(
564 self.profile_id,
565 SyncTaskType::DownloadGpx {
566 activity_id: id,
567 activity_name,
568 activity_date,
569 },
570 );
571 self.queue.push(task)?;
572 }
573 }
574 }
575
576 if activities.len() == limit as usize {
578 let task = SyncTask::new(
579 self.profile_id,
580 SyncTaskType::Activities {
581 start: start + limit,
582 limit,
583 },
584 );
585 self.queue.push(task)?;
586 }
587
588 Ok(())
589 }
590
591 fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
593 let conn = self.db.connection();
594 let conn = conn.lock().unwrap();
595
596 let activity_id = activity
597 .get("activityId")
598 .and_then(|v| v.as_i64())
599 .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
600
601 conn.execute(
602 "INSERT INTO activities (
603 activity_id, profile_id, activity_name, activity_type,
604 start_time_local, duration_sec, distance_m, calories,
605 avg_hr, max_hr, avg_speed, max_speed,
606 elevation_gain, elevation_loss, raw_json
607 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
608 ON CONFLICT (activity_id) DO UPDATE SET
609 activity_name = EXCLUDED.activity_name,
610 activity_type = EXCLUDED.activity_type,
611 raw_json = EXCLUDED.raw_json",
612 duckdb::params![
613 activity_id,
614 self.profile_id,
615 activity.get("activityName").and_then(|v| v.as_str()),
616 activity
617 .get("activityType")
618 .and_then(|v| v.get("typeKey"))
619 .and_then(|v| v.as_str()),
620 activity.get("startTimeLocal").and_then(|v| v.as_str()),
621 activity.get("duration").and_then(|v| v.as_f64()),
622 activity.get("distance").and_then(|v| v.as_f64()),
623 activity
624 .get("calories")
625 .and_then(|v| v.as_i64())
626 .map(|v| v as i32),
627 activity
628 .get("averageHR")
629 .and_then(|v| v.as_i64())
630 .map(|v| v as i32),
631 activity
632 .get("maxHR")
633 .and_then(|v| v.as_i64())
634 .map(|v| v as i32),
635 activity.get("averageSpeed").and_then(|v| v.as_f64()),
636 activity.get("maxSpeed").and_then(|v| v.as_f64()),
637 activity.get("elevationGain").and_then(|v| v.as_f64()),
638 activity.get("elevationLoss").and_then(|v| v.as_f64()),
639 activity.to_string(),
640 ],
641 )
642 .map_err(|e| GarminError::Database(e.to_string()))?;
643
644 Ok(())
645 }
646
647 async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
649 Ok(())
651 }
652
653 async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
655 let path = format!("/download-service/export/gpx/activity/{}", activity_id);
656 let gpx_bytes = self.client.download(&self.token, &path).await?;
657 let gpx_data = String::from_utf8_lossy(&gpx_bytes);
658 self.parse_and_store_gpx(activity_id, &gpx_data)?;
659 Ok(())
660 }
661
662 fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
664 use gpx::read;
665 use std::io::BufReader;
666
667 let reader = BufReader::new(gpx_data.as_bytes());
668 let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
669
670 let conn = self.db.connection();
671 let conn = conn.lock().unwrap();
672
673 for track in gpx.tracks {
674 for segment in track.segments {
675 for point in segment.points {
676 let timestamp = point
677 .time
678 .map(|t| t.format().unwrap_or_default())
679 .unwrap_or_default();
680
681 conn.execute(
682 "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
683 VALUES (?, ?, ?, ?, ?)",
684 duckdb::params![
685 activity_id,
686 timestamp,
687 point.point().y(),
688 point.point().x(),
689 point.elevation,
690 ],
691 )
692 .map_err(|e| GarminError::Database(e.to_string()))?;
693 }
694 }
695 }
696
697 Ok(())
698 }
699
700 async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
702 let display_name = self.get_display_name().await?;
704
705 let path = format!(
706 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
707 display_name, date
708 );
709
710 let health_result: std::result::Result<serde_json::Value, _> =
712 self.client.get_json(&self.token, &path).await;
713
714 let health = match health_result {
715 Ok(data) => data,
716 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
717 serde_json::json!({})
720 }
721 Err(e) => return Err(e),
722 };
723
724 let conn = self.db.connection();
725 let conn = conn.lock().unwrap();
726
727 conn.execute(
728 "INSERT INTO daily_health (
729 profile_id, date, steps, step_goal, total_calories, active_calories,
730 resting_hr, sleep_seconds, avg_stress, max_stress,
731 body_battery_start, body_battery_end, raw_json
732 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
733 ON CONFLICT (profile_id, date) DO UPDATE SET
734 steps = EXCLUDED.steps,
735 step_goal = EXCLUDED.step_goal,
736 total_calories = EXCLUDED.total_calories,
737 active_calories = EXCLUDED.active_calories,
738 resting_hr = EXCLUDED.resting_hr,
739 sleep_seconds = EXCLUDED.sleep_seconds,
740 avg_stress = EXCLUDED.avg_stress,
741 max_stress = EXCLUDED.max_stress,
742 body_battery_start = EXCLUDED.body_battery_start,
743 body_battery_end = EXCLUDED.body_battery_end,
744 raw_json = EXCLUDED.raw_json",
745 duckdb::params![
746 self.profile_id,
747 date.to_string(),
748 health
749 .get("totalSteps")
750 .and_then(|v| v.as_i64())
751 .map(|v| v as i32),
752 health
753 .get("dailyStepGoal")
754 .and_then(|v| v.as_i64())
755 .map(|v| v as i32),
756 health
757 .get("totalKilocalories")
758 .and_then(|v| v.as_i64())
759 .map(|v| v as i32),
760 health
761 .get("activeKilocalories")
762 .and_then(|v| v.as_i64())
763 .map(|v| v as i32),
764 health
765 .get("restingHeartRate")
766 .and_then(|v| v.as_i64())
767 .map(|v| v as i32),
768 health
769 .get("sleepingSeconds")
770 .and_then(|v| v.as_i64())
771 .map(|v| v as i32),
772 health
773 .get("averageStressLevel")
774 .and_then(|v| v.as_i64())
775 .map(|v| v as i32),
776 health
777 .get("maxStressLevel")
778 .and_then(|v| v.as_i64())
779 .map(|v| v as i32),
780 health
781 .get("bodyBatteryChargedValue")
782 .and_then(|v| v.as_i64())
783 .map(|v| v as i32),
784 health
785 .get("bodyBatteryDrainedValue")
786 .and_then(|v| v.as_i64())
787 .map(|v| v as i32),
788 health.to_string(),
789 ],
790 )
791 .map_err(|e| GarminError::Database(e.to_string()))?;
792
793 Ok(())
794 }
795
796 async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
798 let vo2max: Option<serde_json::Value> = self
800 .client
801 .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
802 .await
803 .ok();
804
805 let race_predictions: Option<serde_json::Value> = self
807 .client
808 .get_json(
809 &self.token,
810 "/metrics-service/metrics/racepredictions/latest",
811 )
812 .await
813 .ok();
814
815 let conn = self.db.connection();
816 let conn = conn.lock().unwrap();
817
818 let vo2max_value = vo2max
819 .as_ref()
820 .and_then(|v| v.get("generic"))
821 .and_then(|v| v.get("vo2MaxValue"))
822 .and_then(|v| v.as_f64());
823
824 let fitness_age = vo2max
825 .as_ref()
826 .and_then(|v| v.get("generic"))
827 .and_then(|v| v.get("fitnessAge"))
828 .and_then(|v| v.as_i64())
829 .map(|v| v as i32);
830
831 let race_5k = race_predictions
832 .as_ref()
833 .and_then(|v| v.get("time5K"))
834 .and_then(|v| v.as_f64())
835 .map(|v| v as i32);
836
837 let race_10k = race_predictions
838 .as_ref()
839 .and_then(|v| v.get("time10K"))
840 .and_then(|v| v.as_f64())
841 .map(|v| v as i32);
842
843 let race_half = race_predictions
844 .as_ref()
845 .and_then(|v| v.get("timeHalfMarathon"))
846 .and_then(|v| v.as_f64())
847 .map(|v| v as i32);
848
849 let race_marathon = race_predictions
850 .as_ref()
851 .and_then(|v| v.get("timeMarathon"))
852 .and_then(|v| v.as_f64())
853 .map(|v| v as i32);
854
855 conn.execute(
856 "INSERT INTO performance_metrics (
857 profile_id, date, vo2max, fitness_age,
858 race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
859 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
860 ON CONFLICT (profile_id, date) DO UPDATE SET
861 vo2max = EXCLUDED.vo2max,
862 fitness_age = EXCLUDED.fitness_age,
863 race_5k_sec = EXCLUDED.race_5k_sec,
864 race_10k_sec = EXCLUDED.race_10k_sec,
865 race_half_sec = EXCLUDED.race_half_sec,
866 race_marathon_sec = EXCLUDED.race_marathon_sec",
867 duckdb::params![
868 self.profile_id,
869 date.to_string(),
870 vo2max_value,
871 fitness_age,
872 race_5k,
873 race_10k,
874 race_half,
875 race_marathon,
876 ],
877 )
878 .map_err(|e| GarminError::Database(e.to_string()))?;
879
880 Ok(())
881 }
882
883 async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
885 Ok(())
887 }
888
889 async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
891 Ok(())
893 }
894}
895
896#[derive(Default)]
898pub struct SyncOptions {
899 pub sync_activities: bool,
901 pub sync_health: bool,
903 pub sync_performance: bool,
905 pub from_date: Option<NaiveDate>,
907 pub to_date: Option<NaiveDate>,
909 pub dry_run: bool,
911 pub force: bool,
913 pub fancy_ui: bool,
915 pub concurrency: usize,
917}
918
919impl SyncOptions {
920 pub fn full() -> Self {
922 Self {
923 sync_activities: true,
924 sync_health: true,
925 sync_performance: true,
926 fancy_ui: true,
927 concurrency: 3,
928 ..Default::default()
929 }
930 }
931
932 pub fn simple() -> Self {
934 Self {
935 sync_activities: true,
936 sync_health: true,
937 sync_performance: true,
938 fancy_ui: false,
939 concurrency: 3,
940 ..Default::default()
941 }
942 }
943}
944
945#[derive(Default)]
947pub struct SyncStats {
948 pub recovered: u32,
950 pub completed: u32,
952 pub rate_limited: u32,
954 pub failed: u32,
956}
957
958impl std::fmt::Display for SyncStats {
959 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
960 write!(
961 f,
962 "Completed: {}, Failed: {}, Rate limited: {}",
963 self.completed, self.failed, self.rate_limited
964 )?;
965 if self.recovered > 0 {
966 write!(f, ", Recovered: {}", self.recovered)?;
967 }
968 Ok(())
969 }
970}
971
972fn print_task_status(task: &SyncTask, stats: &SyncStats) {
974 let desc = match &task.task_type {
975 SyncTaskType::Activities { start, limit } => {
976 format!("Fetching activities {}-{}", start, start + limit)
977 }
978 SyncTaskType::ActivityDetail { activity_id } => {
979 format!("Activity {} details", activity_id)
980 }
981 SyncTaskType::DownloadGpx {
982 activity_id,
983 activity_name,
984 activity_date,
985 } => {
986 let name = activity_name.as_deref().unwrap_or("Unknown");
987 let date = activity_date.as_deref().unwrap_or("");
988 if date.is_empty() {
989 format!("GPX: {} ({})", name, activity_id)
990 } else {
991 format!("GPX: {} {} ({})", date, name, activity_id)
992 }
993 }
994 SyncTaskType::DailyHealth { date } => {
995 format!("Health data for {}", date)
996 }
997 SyncTaskType::Performance { date } => {
998 format!("Performance metrics for {}", date)
999 }
1000 SyncTaskType::Weight { from, to } => {
1001 format!("Weight {} to {}", from, to)
1002 }
1003 SyncTaskType::GenerateEmbeddings { activity_ids } => {
1004 format!(
1005 "Generating embeddings for {} activities",
1006 activity_ids.len()
1007 )
1008 }
1009 };
1010
1011 print!("[{:>4}] {}...", stats.completed + 1, desc);
1012 let _ = io::stdout().flush();
1013}
1014
1015fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1017 let desc = match &task.task_type {
1018 SyncTaskType::Activities { start, limit } => {
1019 format!("Activities {}-{}", start, start + limit)
1020 }
1021 SyncTaskType::DownloadGpx {
1022 activity_name,
1023 activity_date,
1024 ..
1025 } => {
1026 let name = activity_name.as_deref().unwrap_or("Unknown");
1027 let date = activity_date.as_deref().unwrap_or("");
1028 if date.is_empty() {
1029 name.to_string()
1030 } else {
1031 format!("{} {}", date, name)
1032 }
1033 }
1034 SyncTaskType::DailyHealth { date } => date.to_string(),
1035 SyncTaskType::Performance { date } => date.to_string(),
1036 _ => String::new(),
1037 };
1038
1039 match &task.task_type {
1040 SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
1041 SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
1042 SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
1043 SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
1044 _ => {}
1045 }
1046}
1047
1048fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1050 match &task.task_type {
1051 SyncTaskType::Activities { .. } => {
1052 progress.activities.complete_one();
1053 }
1056 SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1057 SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1058 SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1059 _ => {}
1060 }
1061}
1062
1063fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1065 match &task.task_type {
1066 SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1067 SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1068 SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1069 SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1070 _ => {}
1071 }
1072}