1pub mod progress;
11pub mod rate_limiter;
12pub mod task_queue;
13pub mod ui;
14
15use std::sync::Arc;
16
17use chrono::{Duration, NaiveDate, Utc};
18
19use crate::client::{GarminClient, OAuth2Token};
20use crate::db::models::{SyncTask, SyncTaskType};
21use std::io::{self, Write};
22use crate::{Database, GarminError, Result};
23
24pub use progress::{SharedProgress, SyncProgress};
25pub use rate_limiter::{RateLimiter, SharedRateLimiter};
26pub use task_queue::TaskQueue;
27
28pub struct SyncEngine {
30 db: Database,
31 client: GarminClient,
32 token: OAuth2Token,
33 rate_limiter: RateLimiter,
34 queue: TaskQueue,
35 profile_id: i32,
36 display_name: Option<String>,
37}
38
39impl SyncEngine {
40 pub fn new(db: Database, client: GarminClient, token: OAuth2Token, profile_id: i32) -> Self {
42 let queue = TaskQueue::new(db.clone());
43 Self {
44 db,
45 client,
46 token,
47 rate_limiter: RateLimiter::new(),
48 queue,
49 profile_id,
50 display_name: None,
51 }
52 }
53
54 async fn get_display_name(&mut self) -> Result<String> {
56 if let Some(ref name) = self.display_name {
57 return Ok(name.clone());
58 }
59
60 let profile: serde_json::Value = self
61 .client
62 .get_json(&self.token, "/userprofile-service/socialProfile")
63 .await?;
64
65 let name = profile
66 .get("displayName")
67 .and_then(|v| v.as_str())
68 .map(|s| s.to_string())
69 .ok_or_else(|| GarminError::invalid_response("Could not get display name"))?;
70
71 self.display_name = Some(name.clone());
72 Ok(name)
73 }
74
75 async fn find_oldest_activity_date(&mut self) -> Result<NaiveDate> {
77 print!("Finding oldest activity date...");
78 let _ = io::stdout().flush();
79
80 self.rate_limiter.wait().await;
84
85 let limit: u32 = 100;
87 let mut jump: u32 = 100;
88 let mut last_non_empty: u32 = 0;
89
90 while jump < 10000 {
92 let path = format!(
93 "/activitylist-service/activities/search/activities?limit=1&start={}",
94 jump
95 );
96
97 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
98
99 if activities.is_empty() {
100 break;
101 }
102
103 last_non_empty = jump;
104 jump *= 2;
105 self.rate_limiter.wait().await;
106 }
107
108 let mut low = last_non_empty;
110 let mut high = jump;
111
112 while high - low > limit {
113 let mid = (low + high) / 2;
114 let path = format!(
115 "/activitylist-service/activities/search/activities?limit=1&start={}",
116 mid
117 );
118
119 self.rate_limiter.wait().await;
120 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
121
122 if activities.is_empty() {
123 high = mid;
124 } else {
125 low = mid;
126 }
127 }
128
129 let path = format!(
131 "/activitylist-service/activities/search/activities?limit={}&start={}",
132 limit, low
133 );
134
135 self.rate_limiter.wait().await;
136 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
137
138 let oldest_date = activities
139 .last()
140 .and_then(|activity| activity.get("startTimeLocal"))
141 .and_then(|v| v.as_str())
142 .and_then(|date_str| date_str.split(' ').next())
143 .and_then(|date_part| NaiveDate::parse_from_str(date_part, "%Y-%m-%d").ok());
144
145 let result = oldest_date.unwrap_or_else(|| {
146 Utc::now().date_naive() - Duration::days(365)
148 });
149
150 println!(" {}", result);
151 Ok(result)
152 }
153
154 pub async fn run(&mut self, opts: SyncOptions) -> Result<SyncStats> {
156 if opts.fancy_ui {
159 self.run_with_progress(&opts).await
160 } else {
161 self.run_sequential(&opts).await
162 }
163 }
164
165 async fn run_with_progress(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
167 let progress = Arc::new(SyncProgress::new());
168
169 let display_name = self.get_display_name().await?;
171 progress.set_profile(&display_name);
172
173 let _recovered = self.queue.recover_in_progress()?;
175
176 if self.queue.pending_count()? == 0 {
178 self.plan_sync(opts).await?;
179 }
180
181 self.count_tasks_for_progress(&progress)?;
183
184 let from_date = opts.from_date.unwrap_or_else(|| Utc::now().date_naive() - Duration::days(365));
186 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
187 progress.set_date_range(&from_date.to_string(), &to_date.to_string());
188
189 let ui_progress = progress.clone();
191 let ui_handle = tokio::spawn(async move {
192 if let Err(e) = ui::run_tui(ui_progress).await {
193 eprintln!("TUI error: {}", e);
194 }
195 });
196
197 let stats = self.run_with_progress_tracking(opts, progress.clone()).await?;
199
200 ui_handle.abort();
202
203 println!("\nSync complete: {}", stats);
205
206 Ok(stats)
207 }
208
209 async fn run_with_progress_tracking(
211 &mut self,
212 opts: &SyncOptions,
213 progress: SharedProgress,
214 ) -> Result<SyncStats> {
215 let mut stats = SyncStats::default();
216
217 while let Some(task) = self.queue.pop()? {
219 if self.rate_limiter.should_pause() {
220 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
221 }
222
223 let task_id = task.id.unwrap();
224 self.queue.mark_in_progress(task_id)?;
225
226 update_progress_for_task(&task, &progress);
228
229 self.rate_limiter.wait().await;
230 progress.record_request();
231
232 match self.execute_task(&task).await {
233 Ok(()) => {
234 self.queue.mark_completed(task_id)?;
235 self.rate_limiter.on_success();
236 stats.completed += 1;
237 complete_progress_for_task(&task, &progress);
238 }
239 Err(GarminError::RateLimited) => {
240 self.rate_limiter.on_rate_limit();
241 let backoff = self.rate_limiter.current_backoff();
242 self.queue.mark_failed(
243 task_id,
244 "Rate limited",
245 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
246 )?;
247 stats.rate_limited += 1;
248 }
249 Err(e) => {
250 let backoff = Duration::seconds(60);
251 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
252 stats.failed += 1;
253 fail_progress_for_task(&task, &progress);
254 }
255 }
256
257 if opts.dry_run {
258 break;
259 }
260 }
261
262 self.queue.cleanup(7)?;
264
265 Ok(stats)
266 }
267
268 async fn run_sequential(&mut self, opts: &SyncOptions) -> Result<SyncStats> {
270 let mut stats = SyncStats::default();
271
272 print!("Fetching user profile...");
274 let _ = io::stdout().flush();
275 let display_name = self.get_display_name().await?;
276 println!(" {}", display_name);
277
278 let recovered = self.queue.recover_in_progress()?;
280 if recovered > 0 {
281 println!(" Recovered {} tasks from previous run", recovered);
282 stats.recovered = recovered;
283 }
284
285 if self.queue.pending_count()? == 0 {
287 println!("Planning sync tasks...");
288 self.plan_sync(opts).await?;
289 }
290
291 let total_tasks = self.queue.pending_count()?;
292 println!(" {} tasks queued\n", total_tasks);
293
294 while let Some(task) = self.queue.pop()? {
296 if self.rate_limiter.should_pause() {
297 println!(
298 " Rate limited, pausing for {} seconds...",
299 self.rate_limiter.pause_duration().as_secs()
300 );
301 tokio::time::sleep(self.rate_limiter.pause_duration()).await;
302 }
303
304 let task_id = task.id.unwrap();
305 self.queue.mark_in_progress(task_id)?;
306
307 print_task_status(&task, &stats);
309
310 self.rate_limiter.wait().await;
311
312 match self.execute_task(&task).await {
313 Ok(()) => {
314 self.queue.mark_completed(task_id)?;
315 self.rate_limiter.on_success();
316 stats.completed += 1;
317 println!(" done");
318 }
319 Err(GarminError::RateLimited) => {
320 self.rate_limiter.on_rate_limit();
321 let backoff = self.rate_limiter.current_backoff();
322 self.queue.mark_failed(
323 task_id,
324 "Rate limited",
325 Duration::from_std(backoff).unwrap_or(Duration::seconds(60)),
326 )?;
327 stats.rate_limited += 1;
328 println!(" rate limited (retry in {}s)", backoff.as_secs());
329 }
330 Err(e) => {
331 let backoff = Duration::seconds(60);
332 self.queue.mark_failed(task_id, &e.to_string(), backoff)?;
333 stats.failed += 1;
334 println!(" failed: {}", e);
335 }
336 }
337
338 if opts.dry_run {
339 break;
340 }
341 }
342
343 self.queue.cleanup(7)?;
345
346 Ok(stats)
347 }
348
349 fn count_tasks_for_progress(&self, progress: &SyncProgress) -> Result<()> {
351 let conn = self.db.connection();
352 let conn = conn.lock().unwrap();
353
354 let act_count: i64 = conn
356 .query_row(
357 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'activities'",
358 [],
359 |row| row.get(0),
360 )
361 .unwrap_or(0);
362 progress.activities.set_total(act_count as u32);
363
364 let gpx_count: i64 = conn
366 .query_row(
367 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'download_gpx'",
368 [],
369 |row| row.get(0),
370 )
371 .unwrap_or(0);
372 progress.gpx.set_total(gpx_count as u32);
373
374 let health_count: i64 = conn
376 .query_row(
377 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'daily_health'",
378 [],
379 |row| row.get(0),
380 )
381 .unwrap_or(0);
382 progress.health.set_total(health_count as u32);
383
384 let perf_count: i64 = conn
386 .query_row(
387 "SELECT COUNT(*) FROM sync_tasks WHERE status IN ('pending', 'failed') AND task_type = 'performance'",
388 [],
389 |row| row.get(0),
390 )
391 .unwrap_or(0);
392 progress.performance.set_total(perf_count as u32);
393
394 Ok(())
395 }
396
397 async fn plan_sync(&mut self, opts: &SyncOptions) -> Result<()> {
399 let from_date = match opts.from_date {
401 Some(date) => date,
402 None => self.find_oldest_activity_date().await?,
403 };
404 let to_date = opts.to_date.unwrap_or_else(|| Utc::now().date_naive());
405
406 let total_days = (to_date - from_date).num_days();
407 println!(" Date range: {} to {} ({} days)", from_date, to_date, total_days);
408
409 if opts.sync_activities {
411 println!(" Planning activity sync...");
412 self.plan_activities_sync()?;
413 }
414
415 if opts.sync_health {
417 let health_tasks = self.plan_health_sync(from_date, to_date)?;
418 println!(" Planning health sync: {} days to fetch", health_tasks);
419 }
420
421 if opts.sync_performance {
423 let perf_tasks = self.plan_performance_sync(from_date, to_date)?;
424 println!(" Planning performance sync: {} weeks to fetch", perf_tasks);
425 }
426
427 Ok(())
428 }
429
430 fn plan_activities_sync(&self) -> Result<()> {
432 let task = SyncTask::new(
434 self.profile_id,
435 SyncTaskType::Activities { start: 0, limit: 50 },
436 );
437 self.queue.push(task)?;
438 Ok(())
439 }
440
441 fn plan_health_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
443 let mut count = 0;
444 let mut date = from;
445 while date <= to {
446 if !self.has_health_data(date)? {
448 let task = SyncTask::new(self.profile_id, SyncTaskType::DailyHealth { date });
449 self.queue.push(task)?;
450 count += 1;
451 }
452 date += Duration::days(1);
453 }
454 Ok(count)
455 }
456
457 fn plan_performance_sync(&self, from: NaiveDate, to: NaiveDate) -> Result<u32> {
459 let mut count = 0;
461 let mut date = from;
462 while date <= to {
463 if !self.has_performance_data(date)? {
464 let task = SyncTask::new(self.profile_id, SyncTaskType::Performance { date });
465 self.queue.push(task)?;
466 count += 1;
467 }
468 date += Duration::days(7);
469 }
470 Ok(count)
471 }
472
473 fn has_health_data(&self, date: NaiveDate) -> Result<bool> {
475 let conn = self.db.connection();
476 let conn = conn.lock().unwrap();
477
478 let count: i64 = conn
479 .query_row(
480 "SELECT COUNT(*) FROM daily_health WHERE profile_id = ? AND date = ?",
481 duckdb::params![self.profile_id, date.to_string()],
482 |row| row.get(0),
483 )
484 .map_err(|e| GarminError::Database(e.to_string()))?;
485
486 Ok(count > 0)
487 }
488
489 fn has_performance_data(&self, date: NaiveDate) -> Result<bool> {
491 let conn = self.db.connection();
492 let conn = conn.lock().unwrap();
493
494 let count: i64 = conn
495 .query_row(
496 "SELECT COUNT(*) FROM performance_metrics WHERE profile_id = ? AND date = ?",
497 duckdb::params![self.profile_id, date.to_string()],
498 |row| row.get(0),
499 )
500 .map_err(|e| GarminError::Database(e.to_string()))?;
501
502 Ok(count > 0)
503 }
504
505 async fn execute_task(&mut self, task: &SyncTask) -> Result<()> {
507 match &task.task_type {
508 SyncTaskType::Activities { start, limit } => {
509 self.sync_activities(*start, *limit).await
510 }
511 SyncTaskType::ActivityDetail { activity_id } => {
512 self.sync_activity_detail(*activity_id).await
513 }
514 SyncTaskType::DownloadGpx { activity_id, .. } => {
515 self.download_gpx(*activity_id).await
516 }
517 SyncTaskType::DailyHealth { date } => {
518 self.sync_daily_health(*date).await
519 }
520 SyncTaskType::Performance { date } => {
521 self.sync_performance(*date).await
522 }
523 SyncTaskType::Weight { from, to } => {
524 self.sync_weight(*from, *to).await
525 }
526 SyncTaskType::GenerateEmbeddings { activity_ids } => {
527 self.generate_embeddings(activity_ids).await
528 }
529 }
530 }
531
532 async fn sync_activities(&mut self, start: u32, limit: u32) -> Result<()> {
534 let path = format!(
535 "/activitylist-service/activities/search/activities?limit={}&start={}",
536 limit, start
537 );
538 let activities: Vec<serde_json::Value> = self.client.get_json(&self.token, &path).await?;
539
540 for activity in &activities {
541 self.store_activity(activity)?;
543
544 if activity.get("hasPolyline").and_then(|v| v.as_bool()).unwrap_or(false) {
546 if let Some(id) = activity.get("activityId").and_then(|v| v.as_i64()) {
547 let activity_name = activity
548 .get("activityName")
549 .and_then(|v| v.as_str())
550 .map(|s| s.to_string());
551 let activity_date = activity
552 .get("startTimeLocal")
553 .and_then(|v| v.as_str())
554 .and_then(|s| s.split(' ').next())
555 .map(|s| s.to_string());
556
557 let task = SyncTask::new(
558 self.profile_id,
559 SyncTaskType::DownloadGpx {
560 activity_id: id,
561 activity_name,
562 activity_date,
563 },
564 );
565 self.queue.push(task)?;
566 }
567 }
568 }
569
570 if activities.len() == limit as usize {
572 let task = SyncTask::new(
573 self.profile_id,
574 SyncTaskType::Activities {
575 start: start + limit,
576 limit,
577 },
578 );
579 self.queue.push(task)?;
580 }
581
582 Ok(())
583 }
584
585 fn store_activity(&self, activity: &serde_json::Value) -> Result<()> {
587 let conn = self.db.connection();
588 let conn = conn.lock().unwrap();
589
590 let activity_id = activity
591 .get("activityId")
592 .and_then(|v| v.as_i64())
593 .ok_or_else(|| GarminError::invalid_response("Missing activityId"))?;
594
595 conn.execute(
596 "INSERT INTO activities (
597 activity_id, profile_id, activity_name, activity_type,
598 start_time_local, duration_sec, distance_m, calories,
599 avg_hr, max_hr, avg_speed, max_speed,
600 elevation_gain, elevation_loss, raw_json
601 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
602 ON CONFLICT (activity_id) DO UPDATE SET
603 activity_name = EXCLUDED.activity_name,
604 activity_type = EXCLUDED.activity_type,
605 raw_json = EXCLUDED.raw_json",
606 duckdb::params![
607 activity_id,
608 self.profile_id,
609 activity.get("activityName").and_then(|v| v.as_str()),
610 activity.get("activityType").and_then(|v| v.get("typeKey")).and_then(|v| v.as_str()),
611 activity.get("startTimeLocal").and_then(|v| v.as_str()),
612 activity.get("duration").and_then(|v| v.as_f64()),
613 activity.get("distance").and_then(|v| v.as_f64()),
614 activity.get("calories").and_then(|v| v.as_i64()).map(|v| v as i32),
615 activity.get("averageHR").and_then(|v| v.as_i64()).map(|v| v as i32),
616 activity.get("maxHR").and_then(|v| v.as_i64()).map(|v| v as i32),
617 activity.get("averageSpeed").and_then(|v| v.as_f64()),
618 activity.get("maxSpeed").and_then(|v| v.as_f64()),
619 activity.get("elevationGain").and_then(|v| v.as_f64()),
620 activity.get("elevationLoss").and_then(|v| v.as_f64()),
621 activity.to_string(),
622 ],
623 )
624 .map_err(|e| GarminError::Database(e.to_string()))?;
625
626 Ok(())
627 }
628
629 async fn sync_activity_detail(&mut self, _activity_id: i64) -> Result<()> {
631 Ok(())
633 }
634
635 async fn download_gpx(&mut self, activity_id: i64) -> Result<()> {
637 let path = format!("/download-service/export/gpx/activity/{}", activity_id);
638 let gpx_bytes = self.client.download(&self.token, &path).await?;
639 let gpx_data = String::from_utf8_lossy(&gpx_bytes);
640 self.parse_and_store_gpx(activity_id, &gpx_data)?;
641 Ok(())
642 }
643
644 fn parse_and_store_gpx(&self, activity_id: i64, gpx_data: &str) -> Result<()> {
646 use gpx::read;
647 use std::io::BufReader;
648
649 let reader = BufReader::new(gpx_data.as_bytes());
650 let gpx = read(reader).map_err(|e| GarminError::invalid_response(e.to_string()))?;
651
652 let conn = self.db.connection();
653 let conn = conn.lock().unwrap();
654
655 for track in gpx.tracks {
656 for segment in track.segments {
657 for point in segment.points {
658 let timestamp = point
659 .time
660 .map(|t| t.format().unwrap_or_default())
661 .unwrap_or_default();
662
663 conn.execute(
664 "INSERT INTO track_points (activity_id, timestamp, lat, lon, elevation)
665 VALUES (?, ?, ?, ?, ?)",
666 duckdb::params![
667 activity_id,
668 timestamp,
669 point.point().y(),
670 point.point().x(),
671 point.elevation,
672 ],
673 )
674 .map_err(|e| GarminError::Database(e.to_string()))?;
675 }
676 }
677 }
678
679 Ok(())
680 }
681
682 async fn sync_daily_health(&mut self, date: NaiveDate) -> Result<()> {
684 let display_name = self.get_display_name().await?;
686
687 let path = format!(
688 "/usersummary-service/usersummary/daily/{}?calendarDate={}",
689 display_name, date
690 );
691
692 let health_result: std::result::Result<serde_json::Value, _> =
694 self.client.get_json(&self.token, &path).await;
695
696 let health = match health_result {
697 Ok(data) => data,
698 Err(GarminError::NotFound(_)) | Err(GarminError::Api { .. }) => {
699 serde_json::json!({})
702 }
703 Err(e) => return Err(e),
704 };
705
706 let conn = self.db.connection();
707 let conn = conn.lock().unwrap();
708
709 conn.execute(
710 "INSERT INTO daily_health (
711 profile_id, date, steps, step_goal, total_calories, active_calories,
712 resting_hr, sleep_seconds, avg_stress, max_stress,
713 body_battery_start, body_battery_end, raw_json
714 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
715 ON CONFLICT (profile_id, date) DO UPDATE SET
716 steps = EXCLUDED.steps,
717 step_goal = EXCLUDED.step_goal,
718 total_calories = EXCLUDED.total_calories,
719 active_calories = EXCLUDED.active_calories,
720 resting_hr = EXCLUDED.resting_hr,
721 sleep_seconds = EXCLUDED.sleep_seconds,
722 avg_stress = EXCLUDED.avg_stress,
723 max_stress = EXCLUDED.max_stress,
724 body_battery_start = EXCLUDED.body_battery_start,
725 body_battery_end = EXCLUDED.body_battery_end,
726 raw_json = EXCLUDED.raw_json",
727 duckdb::params![
728 self.profile_id,
729 date.to_string(),
730 health.get("totalSteps").and_then(|v| v.as_i64()).map(|v| v as i32),
731 health.get("dailyStepGoal").and_then(|v| v.as_i64()).map(|v| v as i32),
732 health.get("totalKilocalories").and_then(|v| v.as_i64()).map(|v| v as i32),
733 health.get("activeKilocalories").and_then(|v| v.as_i64()).map(|v| v as i32),
734 health.get("restingHeartRate").and_then(|v| v.as_i64()).map(|v| v as i32),
735 health.get("sleepingSeconds").and_then(|v| v.as_i64()).map(|v| v as i32),
736 health.get("averageStressLevel").and_then(|v| v.as_i64()).map(|v| v as i32),
737 health.get("maxStressLevel").and_then(|v| v.as_i64()).map(|v| v as i32),
738 health.get("bodyBatteryChargedValue").and_then(|v| v.as_i64()).map(|v| v as i32),
739 health.get("bodyBatteryDrainedValue").and_then(|v| v.as_i64()).map(|v| v as i32),
740 health.to_string(),
741 ],
742 )
743 .map_err(|e| GarminError::Database(e.to_string()))?;
744
745 Ok(())
746 }
747
748 async fn sync_performance(&mut self, date: NaiveDate) -> Result<()> {
750 let vo2max: Option<serde_json::Value> = self
752 .client
753 .get_json(&self.token, "/metrics-service/metrics/maxmet/latest")
754 .await
755 .ok();
756
757 let race_predictions: Option<serde_json::Value> = self
759 .client
760 .get_json(&self.token, "/metrics-service/metrics/racepredictions/latest")
761 .await
762 .ok();
763
764 let conn = self.db.connection();
765 let conn = conn.lock().unwrap();
766
767 let vo2max_value = vo2max
768 .as_ref()
769 .and_then(|v| v.get("generic"))
770 .and_then(|v| v.get("vo2MaxValue"))
771 .and_then(|v| v.as_f64());
772
773 let fitness_age = vo2max
774 .as_ref()
775 .and_then(|v| v.get("generic"))
776 .and_then(|v| v.get("fitnessAge"))
777 .and_then(|v| v.as_i64())
778 .map(|v| v as i32);
779
780 let race_5k = race_predictions
781 .as_ref()
782 .and_then(|v| v.get("time5K"))
783 .and_then(|v| v.as_f64())
784 .map(|v| v as i32);
785
786 let race_10k = race_predictions
787 .as_ref()
788 .and_then(|v| v.get("time10K"))
789 .and_then(|v| v.as_f64())
790 .map(|v| v as i32);
791
792 let race_half = race_predictions
793 .as_ref()
794 .and_then(|v| v.get("timeHalfMarathon"))
795 .and_then(|v| v.as_f64())
796 .map(|v| v as i32);
797
798 let race_marathon = race_predictions
799 .as_ref()
800 .and_then(|v| v.get("timeMarathon"))
801 .and_then(|v| v.as_f64())
802 .map(|v| v as i32);
803
804 conn.execute(
805 "INSERT INTO performance_metrics (
806 profile_id, date, vo2max, fitness_age,
807 race_5k_sec, race_10k_sec, race_half_sec, race_marathon_sec
808 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
809 ON CONFLICT (profile_id, date) DO UPDATE SET
810 vo2max = EXCLUDED.vo2max,
811 fitness_age = EXCLUDED.fitness_age,
812 race_5k_sec = EXCLUDED.race_5k_sec,
813 race_10k_sec = EXCLUDED.race_10k_sec,
814 race_half_sec = EXCLUDED.race_half_sec,
815 race_marathon_sec = EXCLUDED.race_marathon_sec",
816 duckdb::params![
817 self.profile_id,
818 date.to_string(),
819 vo2max_value,
820 fitness_age,
821 race_5k,
822 race_10k,
823 race_half,
824 race_marathon,
825 ],
826 )
827 .map_err(|e| GarminError::Database(e.to_string()))?;
828
829 Ok(())
830 }
831
832 async fn sync_weight(&mut self, _from: NaiveDate, _to: NaiveDate) -> Result<()> {
834 Ok(())
836 }
837
838 async fn generate_embeddings(&mut self, _activity_ids: &[i64]) -> Result<()> {
840 Ok(())
842 }
843}
844
845#[derive(Default)]
847pub struct SyncOptions {
848 pub sync_activities: bool,
850 pub sync_health: bool,
852 pub sync_performance: bool,
854 pub from_date: Option<NaiveDate>,
856 pub to_date: Option<NaiveDate>,
858 pub dry_run: bool,
860 pub force: bool,
862 pub fancy_ui: bool,
864 pub concurrency: usize,
866}
867
868impl SyncOptions {
869 pub fn full() -> Self {
871 Self {
872 sync_activities: true,
873 sync_health: true,
874 sync_performance: true,
875 fancy_ui: true,
876 concurrency: 3,
877 ..Default::default()
878 }
879 }
880
881 pub fn simple() -> Self {
883 Self {
884 sync_activities: true,
885 sync_health: true,
886 sync_performance: true,
887 fancy_ui: false,
888 concurrency: 3,
889 ..Default::default()
890 }
891 }
892}
893
894#[derive(Default)]
896pub struct SyncStats {
897 pub recovered: u32,
899 pub completed: u32,
901 pub rate_limited: u32,
903 pub failed: u32,
905}
906
907impl std::fmt::Display for SyncStats {
908 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
909 write!(
910 f,
911 "Completed: {}, Failed: {}, Rate limited: {}",
912 self.completed, self.failed, self.rate_limited
913 )?;
914 if self.recovered > 0 {
915 write!(f, ", Recovered: {}", self.recovered)?;
916 }
917 Ok(())
918 }
919}
920
921fn print_task_status(task: &SyncTask, stats: &SyncStats) {
923 let desc = match &task.task_type {
924 SyncTaskType::Activities { start, limit } => {
925 format!("Fetching activities {}-{}", start, start + limit)
926 }
927 SyncTaskType::ActivityDetail { activity_id } => {
928 format!("Activity {} details", activity_id)
929 }
930 SyncTaskType::DownloadGpx {
931 activity_id,
932 activity_name,
933 activity_date,
934 } => {
935 let name = activity_name.as_deref().unwrap_or("Unknown");
936 let date = activity_date.as_deref().unwrap_or("");
937 if date.is_empty() {
938 format!("GPX: {} ({})", name, activity_id)
939 } else {
940 format!("GPX: {} {} ({})", date, name, activity_id)
941 }
942 }
943 SyncTaskType::DailyHealth { date } => {
944 format!("Health data for {}", date)
945 }
946 SyncTaskType::Performance { date } => {
947 format!("Performance metrics for {}", date)
948 }
949 SyncTaskType::Weight { from, to } => {
950 format!("Weight {} to {}", from, to)
951 }
952 SyncTaskType::GenerateEmbeddings { activity_ids } => {
953 format!("Generating embeddings for {} activities", activity_ids.len())
954 }
955 };
956
957 print!("[{:>4}] {}...", stats.completed + 1, desc);
958 let _ = io::stdout().flush();
959}
960
961fn update_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
963 let desc = match &task.task_type {
964 SyncTaskType::Activities { start, limit } => {
965 format!("Activities {}-{}", start, start + limit)
966 }
967 SyncTaskType::DownloadGpx {
968 activity_name,
969 activity_date,
970 ..
971 } => {
972 let name = activity_name.as_deref().unwrap_or("Unknown");
973 let date = activity_date.as_deref().unwrap_or("");
974 if date.is_empty() {
975 name.to_string()
976 } else {
977 format!("{} {}", date, name)
978 }
979 }
980 SyncTaskType::DailyHealth { date } => date.to_string(),
981 SyncTaskType::Performance { date } => date.to_string(),
982 _ => String::new(),
983 };
984
985 match &task.task_type {
986 SyncTaskType::Activities { .. } => progress.activities.set_last_item(desc),
987 SyncTaskType::DownloadGpx { .. } => progress.gpx.set_last_item(desc),
988 SyncTaskType::DailyHealth { .. } => progress.health.set_last_item(desc),
989 SyncTaskType::Performance { .. } => progress.performance.set_last_item(desc),
990 _ => {}
991 }
992}
993
994fn complete_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
996 match &task.task_type {
997 SyncTaskType::Activities { .. } => {
998 progress.activities.complete_one();
999 }
1002 SyncTaskType::DownloadGpx { .. } => progress.gpx.complete_one(),
1003 SyncTaskType::DailyHealth { .. } => progress.health.complete_one(),
1004 SyncTaskType::Performance { .. } => progress.performance.complete_one(),
1005 _ => {}
1006 }
1007}
1008
1009fn fail_progress_for_task(task: &SyncTask, progress: &SyncProgress) {
1011 match &task.task_type {
1012 SyncTaskType::Activities { .. } => progress.activities.fail_one(),
1013 SyncTaskType::DownloadGpx { .. } => progress.gpx.fail_one(),
1014 SyncTaskType::DailyHealth { .. } => progress.health.fail_one(),
1015 SyncTaskType::Performance { .. } => progress.performance.fail_one(),
1016 _ => {}
1017 }
1018}