1use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
9pub enum SyncMode {
10 #[default]
12 Latest,
13 Backfill,
15}
16
17impl std::fmt::Display for SyncMode {
18 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19 match self {
20 SyncMode::Latest => write!(f, "Latest"),
21 SyncMode::Backfill => write!(f, "Backfill"),
22 }
23 }
24}
25
26#[derive(Debug, Clone, PartialEq)]
28pub enum PlanningStep {
29 FetchingProfile,
30 FindingOldestActivity,
31 FindingFirstHealth,
32 FindingFirstPerformance,
33 PlanningActivities,
34 PlanningHealth { days: u32 },
35 PlanningPerformance { weeks: u32 },
36 Complete,
37}
38
39#[derive(Debug, Clone)]
41pub struct ErrorEntry {
42 pub stream: &'static str,
43 pub item: String,
44 pub error: String,
45}
46
47#[derive(Debug)]
49pub struct StreamProgress {
50 pub name: &'static str,
52 pub total: AtomicU32,
54 pub completed: AtomicU32,
56 pub failed: AtomicU32,
58 last_item: Mutex<String>,
60 current_item: Mutex<String>,
62 pub is_dynamic: AtomicBool,
64}
65
66impl StreamProgress {
67 pub fn new(name: &'static str) -> Self {
69 Self {
70 name,
71 total: AtomicU32::new(0),
72 completed: AtomicU32::new(0),
73 failed: AtomicU32::new(0),
74 last_item: Mutex::new(String::new()),
75 current_item: Mutex::new(String::new()),
76 is_dynamic: AtomicBool::new(false),
77 }
78 }
79
80 pub fn set_dynamic(&self, dynamic: bool) {
82 self.is_dynamic.store(dynamic, Ordering::Relaxed);
83 }
84
85 pub fn is_dynamic(&self) -> bool {
87 self.is_dynamic.load(Ordering::Relaxed)
88 }
89
90 pub fn set_total(&self, total: u32) {
92 self.total.store(total, Ordering::Relaxed);
93 }
94
95 pub fn add_total(&self, count: u32) {
97 self.total.fetch_add(count, Ordering::Relaxed);
98 }
99
100 pub fn complete_one(&self) {
102 self.completed.fetch_add(1, Ordering::Relaxed);
103 }
104
105 pub fn fail_one(&self) {
107 self.failed.fetch_add(1, Ordering::Relaxed);
108 }
109
110 pub fn set_last_item(&self, desc: String) {
112 let mut last = self.last_item.lock().unwrap();
113 *last = desc;
114 }
115
116 pub fn get_last_item(&self) -> String {
118 self.last_item.lock().unwrap().clone()
119 }
120
121 pub fn set_current_item(&self, desc: String) {
123 let mut current = self.current_item.lock().unwrap();
124 *current = desc;
125 }
126
127 pub fn get_current_item(&self) -> String {
129 self.current_item.lock().unwrap().clone()
130 }
131
132 pub fn clear_current_item(&self) {
134 let mut current = self.current_item.lock().unwrap();
135 current.clear();
136 }
137
138 pub fn percent(&self) -> u16 {
140 let total = self.total.load(Ordering::Relaxed);
141 if total == 0 {
142 return 0;
143 }
144 let completed = self.completed.load(Ordering::Relaxed);
145 ((completed as f64 / total as f64) * 100.0) as u16
146 }
147
148 pub fn is_complete(&self) -> bool {
150 let total = self.total.load(Ordering::Relaxed);
151 let completed = self.completed.load(Ordering::Relaxed);
152 let failed = self.failed.load(Ordering::Relaxed);
153 total > 0 && (completed + failed) >= total
154 }
155
156 pub fn get_total(&self) -> u32 {
158 self.total.load(Ordering::Relaxed)
159 }
160
161 pub fn get_completed(&self) -> u32 {
163 self.completed.load(Ordering::Relaxed)
164 }
165
166 pub fn get_failed(&self) -> u32 {
168 self.failed.load(Ordering::Relaxed)
169 }
170}
171
172pub struct SyncProgress {
174 pub activities: StreamProgress,
176 pub gpx: StreamProgress,
178 pub health: StreamProgress,
180 pub performance: StreamProgress,
182 pub start_time: Instant,
184 pub profile_name: Mutex<String>,
186 pub date_range: Mutex<String>,
188 sync_mode: AtomicU8,
190 pub latest_range: Mutex<Option<(String, String)>>,
192 pub backfill_range: Mutex<Option<(String, String)>>,
194 pub rate_history: Mutex<Vec<u32>>,
196 pub total_requests: AtomicU32,
198 pub errors: Mutex<Vec<ErrorEntry>>,
200 pub storage_path: Mutex<String>,
202 pub is_planning: AtomicBool,
204 pub shutdown: AtomicBool,
206 pub planning_step: Mutex<PlanningStep>,
208 pub oldest_activity_date: Mutex<Option<String>>,
210}
211
212impl SyncProgress {
213 pub fn new() -> Self {
215 Self {
216 activities: StreamProgress::new("Activities"),
217 gpx: StreamProgress::new("GPX"),
218 health: StreamProgress::new("Health"),
219 performance: StreamProgress::new("Performance"),
220 start_time: Instant::now(),
221 profile_name: Mutex::new(String::new()),
222 date_range: Mutex::new(String::new()),
223 sync_mode: AtomicU8::new(SyncMode::Latest as u8),
224 latest_range: Mutex::new(None),
225 backfill_range: Mutex::new(None),
226 rate_history: Mutex::new(vec![0; 60]),
227 total_requests: AtomicU32::new(0),
228 errors: Mutex::new(Vec::new()),
229 storage_path: Mutex::new(String::new()),
230 is_planning: AtomicBool::new(true),
231 shutdown: AtomicBool::new(false),
232 planning_step: Mutex::new(PlanningStep::FetchingProfile),
233 oldest_activity_date: Mutex::new(None),
234 }
235 }
236
237 pub fn set_planning_step(&self, step: PlanningStep) {
239 *self.planning_step.lock().unwrap() = step;
240 }
241
242 pub fn get_planning_step(&self) -> PlanningStep {
244 self.planning_step.lock().unwrap().clone()
245 }
246
247 pub fn finish_planning(&self) {
249 self.is_planning.store(false, Ordering::Relaxed);
250 *self.planning_step.lock().unwrap() = PlanningStep::Complete;
251 }
252
253 pub fn is_planning(&self) -> bool {
255 self.is_planning.load(Ordering::Relaxed)
256 }
257
258 pub fn request_shutdown(&self) {
260 self.shutdown.store(true, Ordering::Relaxed);
261 }
262
263 pub fn should_shutdown(&self) -> bool {
265 self.shutdown.load(Ordering::Relaxed)
266 }
267
268 pub fn set_oldest_activity_date(&self, date: &str) {
270 *self.oldest_activity_date.lock().unwrap() = Some(date.to_string());
271 }
272
273 pub fn get_oldest_activity_date(&self) -> Option<String> {
275 self.oldest_activity_date.lock().unwrap().clone()
276 }
277
278 pub fn set_profile(&self, name: &str) {
280 let mut profile = self.profile_name.lock().unwrap();
281 *profile = name.to_string();
282 }
283
284 pub fn get_profile(&self) -> String {
286 self.profile_name.lock().unwrap().clone()
287 }
288
289 pub fn set_date_range(&self, from: &str, to: &str) {
291 let mut range = self.date_range.lock().unwrap();
292 *range = format!("{} -> {}", from, to);
293 }
294
295 pub fn get_date_range(&self) -> String {
297 self.date_range.lock().unwrap().clone()
298 }
299
300 pub fn set_sync_mode(&self, mode: SyncMode) {
302 self.sync_mode.store(mode as u8, Ordering::Relaxed);
303 }
304
305 pub fn get_sync_mode(&self) -> SyncMode {
307 match self.sync_mode.load(Ordering::Relaxed) {
308 0 => SyncMode::Latest,
309 1 => SyncMode::Backfill,
310 _ => SyncMode::Latest,
311 }
312 }
313
314 pub fn set_latest_range(&self, from: &str, to: &str) {
316 *self.latest_range.lock().unwrap() = Some((from.to_string(), to.to_string()));
317 }
318
319 pub fn get_latest_range(&self) -> Option<(String, String)> {
321 self.latest_range.lock().unwrap().clone()
322 }
323
324 pub fn set_backfill_range(&self, frontier: &str, target: &str) {
326 *self.backfill_range.lock().unwrap() = Some((frontier.to_string(), target.to_string()));
327 }
328
329 pub fn get_backfill_range(&self) -> Option<(String, String)> {
331 self.backfill_range.lock().unwrap().clone()
332 }
333
334 pub fn set_storage_path(&self, path: &str) {
336 let mut storage = self.storage_path.lock().unwrap();
337 *storage = path.to_string();
338 }
339
340 pub fn get_storage_path(&self) -> String {
342 self.storage_path.lock().unwrap().clone()
343 }
344
345 pub fn add_error(&self, stream: &'static str, item: String, error: String) {
347 let mut errors = self.errors.lock().unwrap();
348 errors.push(ErrorEntry {
349 stream,
350 item,
351 error,
352 });
353 }
354
355 pub fn get_errors(&self) -> Vec<ErrorEntry> {
357 self.errors.lock().unwrap().clone()
358 }
359
360 pub fn get_current_task(&self) -> Option<String> {
362 let streams = [&self.activities, &self.gpx, &self.health, &self.performance];
364
365 for stream in streams {
366 let current = stream.get_current_item();
367 if !current.is_empty() {
368 return Some(format!("{}: {}", stream.name, current));
369 }
370 }
371
372 None
373 }
374
375 pub fn record_request(&self) {
377 self.total_requests.fetch_add(1, Ordering::Relaxed);
378 }
379
380 pub fn update_rate_history(&self) {
382 let current = self.total_requests.load(Ordering::Relaxed);
383 let mut history = self.rate_history.lock().unwrap();
384
385 if history.len() >= 60 {
387 history.remove(0);
388 }
389 history.push(current);
390 }
391
392 pub fn requests_per_minute(&self) -> u32 {
394 let history = self.rate_history.lock().unwrap();
395 if history.len() < 2 {
396 return 0;
397 }
398 let start = history.first().copied().unwrap_or(0);
399 let end = history.last().copied().unwrap_or(0);
400 end.saturating_sub(start)
401 }
402
403 pub fn elapsed_str(&self) -> String {
405 let elapsed = self.start_time.elapsed();
406 let secs = elapsed.as_secs();
407 let mins = secs / 60;
408 let remaining_secs = secs % 60;
409
410 if mins > 0 {
411 format!("{}m {}s", mins, remaining_secs)
412 } else {
413 format!("{}s", secs)
414 }
415 }
416
417 pub fn eta_str(&self) -> String {
419 let total = self.total_remaining();
420 let completed = self.total_completed();
421
422 if completed == 0 {
423 return "calculating...".to_string();
424 }
425
426 let elapsed = self.start_time.elapsed().as_secs_f64();
427 let rate = completed as f64 / elapsed;
428
429 if rate < 0.01 {
430 return "unknown".to_string();
431 }
432
433 let remaining = total.saturating_sub(completed);
434 let eta_secs = (remaining as f64 / rate) as u64;
435
436 if eta_secs > 3600 {
437 let hours = eta_secs / 3600;
438 let mins = (eta_secs % 3600) / 60;
439 format!("~{}h {}m", hours, mins)
440 } else if eta_secs > 60 {
441 let mins = eta_secs / 60;
442 format!("~{} minutes", mins)
443 } else {
444 format!("~{} seconds", eta_secs)
445 }
446 }
447
448 pub fn total_remaining(&self) -> u32 {
450 self.activities.get_total()
451 + self.gpx.get_total()
452 + self.health.get_total()
453 + self.performance.get_total()
454 }
455
456 pub fn total_completed(&self) -> u32 {
458 self.activities.get_completed()
459 + self.gpx.get_completed()
460 + self.health.get_completed()
461 + self.performance.get_completed()
462 }
463
464 pub fn total_failed(&self) -> u32 {
466 self.activities.get_failed()
467 + self.gpx.get_failed()
468 + self.health.get_failed()
469 + self.performance.get_failed()
470 }
471
472 pub fn is_complete(&self) -> bool {
474 (self.activities.get_total() == 0 || self.activities.is_complete())
475 && (self.gpx.get_total() == 0 || self.gpx.is_complete())
476 && (self.health.get_total() == 0 || self.health.is_complete())
477 && (self.performance.get_total() == 0 || self.performance.is_complete())
478 }
479
480 pub fn print_simple_status(&self) {
482 let act = &self.activities;
483 let gpx = &self.gpx;
484 let health = &self.health;
485 let perf = &self.performance;
486
487 print!(
488 "\rAct: {}/{} | GPX: {}/{} | Health: {}/{} | Perf: {}/{} | {} ",
489 act.get_completed(),
490 act.get_total(),
491 gpx.get_completed(),
492 gpx.get_total(),
493 health.get_completed(),
494 health.get_total(),
495 perf.get_completed(),
496 perf.get_total(),
497 self.elapsed_str(),
498 );
499 let _ = std::io::Write::flush(&mut std::io::stdout());
500 }
501}
502
503impl Default for SyncProgress {
504 fn default() -> Self {
505 Self::new()
506 }
507}
508
509impl std::fmt::Display for PlanningStep {
510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 match self {
512 PlanningStep::FetchingProfile => write!(f, "Fetching profile..."),
513 PlanningStep::FindingOldestActivity => write!(f, "Finding oldest activity..."),
514 PlanningStep::FindingFirstHealth => write!(f, "Finding first health data..."),
515 PlanningStep::FindingFirstPerformance => write!(f, "Finding first performance data..."),
516 PlanningStep::PlanningActivities => write!(f, "Planning activity sync..."),
517 PlanningStep::PlanningHealth { days } => {
518 write!(f, "Planning health sync ({} days)...", days)
519 }
520 PlanningStep::PlanningPerformance { weeks } => {
521 write!(f, "Planning performance sync ({} weeks)...", weeks)
522 }
523 PlanningStep::Complete => write!(f, "Planning complete"),
524 }
525 }
526}
527
528pub type SharedProgress = Arc<SyncProgress>;
530
531#[cfg(test)]
532mod tests {
533 use super::*;
534
535 #[test]
536 fn test_stream_progress() {
537 let progress = StreamProgress::new("Test");
538
539 progress.set_total(100);
540 assert_eq!(progress.get_total(), 100);
541 assert_eq!(progress.percent(), 0);
542
543 progress.complete_one();
544 assert_eq!(progress.get_completed(), 1);
545 assert_eq!(progress.percent(), 1);
546
547 for _ in 0..49 {
548 progress.complete_one();
549 }
550 assert_eq!(progress.percent(), 50);
551 }
552
553 #[test]
554 fn test_sync_progress() {
555 let progress = SyncProgress::new();
556
557 progress.activities.set_total(10);
558 progress.health.set_total(20);
559
560 assert_eq!(progress.total_remaining(), 30);
561 assert_eq!(progress.total_completed(), 0);
562
563 progress.activities.complete_one();
564 progress.health.complete_one();
565
566 assert_eq!(progress.total_completed(), 2);
567 }
568}