Skip to main content

garmin_cli/sync/
progress.rs

1//! Progress tracking for parallel sync with atomic counters
2
3use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7/// Sync mode - latest (recent data) vs backfill (historical)
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
9pub enum SyncMode {
10    /// Sync recent data (since last sync or last 7 days)
11    #[default]
12    Latest,
13    /// Backfill historical data
14    Backfill,
15}
16
17impl std::fmt::Display for SyncMode {
18    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
19        match self {
20            SyncMode::Latest => write!(f, "Latest"),
21            SyncMode::Backfill => write!(f, "Backfill"),
22        }
23    }
24}
25
26/// Planning phase steps
27#[derive(Debug, Clone, PartialEq)]
28pub enum PlanningStep {
29    FetchingProfile,
30    FindingOldestActivity,
31    FindingFirstHealth,
32    FindingFirstPerformance,
33    PlanningActivities,
34    PlanningHealth { days: u32 },
35    PlanningPerformance { weeks: u32 },
36    Complete,
37}
38
39/// Error entry with details
40#[derive(Debug, Clone)]
41pub struct ErrorEntry {
42    pub stream: &'static str,
43    pub item: String,
44    pub error: String,
45}
46
47/// Progress tracking for a single data stream
48#[derive(Debug)]
49pub struct StreamProgress {
50    /// Display name for this stream
51    pub name: &'static str,
52    /// Total items to process
53    pub total: AtomicU32,
54    /// Completed items
55    pub completed: AtomicU32,
56    /// Failed items
57    pub failed: AtomicU32,
58    /// Last processed item description
59    last_item: Mutex<String>,
60    /// Current item being processed
61    current_item: Mutex<String>,
62    /// Whether this stream's total is discovered dynamically (pagination, etc.)
63    pub is_dynamic: AtomicBool,
64}
65
66impl StreamProgress {
67    /// Create a new stream progress tracker
68    pub fn new(name: &'static str) -> Self {
69        Self {
70            name,
71            total: AtomicU32::new(0),
72            completed: AtomicU32::new(0),
73            failed: AtomicU32::new(0),
74            last_item: Mutex::new(String::new()),
75            current_item: Mutex::new(String::new()),
76            is_dynamic: AtomicBool::new(false),
77        }
78    }
79
80    /// Mark this stream as having a dynamically discovered total
81    pub fn set_dynamic(&self, dynamic: bool) {
82        self.is_dynamic.store(dynamic, Ordering::Relaxed);
83    }
84
85    /// Check if this stream has a dynamic total
86    pub fn is_dynamic(&self) -> bool {
87        self.is_dynamic.load(Ordering::Relaxed)
88    }
89
90    /// Set the total count
91    pub fn set_total(&self, total: u32) {
92        self.total.store(total, Ordering::Relaxed);
93    }
94
95    /// Add to total (for dynamic task generation like activities -> GPX)
96    pub fn add_total(&self, count: u32) {
97        self.total.fetch_add(count, Ordering::Relaxed);
98    }
99
100    /// Increment completed count
101    pub fn complete_one(&self) {
102        self.completed.fetch_add(1, Ordering::Relaxed);
103    }
104
105    /// Increment failed count
106    pub fn fail_one(&self) {
107        self.failed.fetch_add(1, Ordering::Relaxed);
108    }
109
110    /// Set the last processed item description
111    pub fn set_last_item(&self, desc: String) {
112        let mut last = self.last_item.lock().unwrap();
113        *last = desc;
114    }
115
116    /// Get the last processed item description
117    pub fn get_last_item(&self) -> String {
118        self.last_item.lock().unwrap().clone()
119    }
120
121    /// Set the current item being processed
122    pub fn set_current_item(&self, desc: String) {
123        let mut current = self.current_item.lock().unwrap();
124        *current = desc;
125    }
126
127    /// Get the current item being processed
128    pub fn get_current_item(&self) -> String {
129        self.current_item.lock().unwrap().clone()
130    }
131
132    /// Clear the current item (when done processing)
133    pub fn clear_current_item(&self) {
134        let mut current = self.current_item.lock().unwrap();
135        current.clear();
136    }
137
138    /// Get completion percentage (0-100)
139    pub fn percent(&self) -> u16 {
140        let total = self.total.load(Ordering::Relaxed);
141        if total == 0 {
142            return 0;
143        }
144        let completed = self.completed.load(Ordering::Relaxed);
145        ((completed as f64 / total as f64) * 100.0) as u16
146    }
147
148    /// Check if this stream is complete
149    pub fn is_complete(&self) -> bool {
150        let total = self.total.load(Ordering::Relaxed);
151        let completed = self.completed.load(Ordering::Relaxed);
152        let failed = self.failed.load(Ordering::Relaxed);
153        total > 0 && (completed + failed) >= total
154    }
155
156    /// Get total count
157    pub fn get_total(&self) -> u32 {
158        self.total.load(Ordering::Relaxed)
159    }
160
161    /// Get completed count
162    pub fn get_completed(&self) -> u32 {
163        self.completed.load(Ordering::Relaxed)
164    }
165
166    /// Get failed count
167    pub fn get_failed(&self) -> u32 {
168        self.failed.load(Ordering::Relaxed)
169    }
170}
171
172/// Overall sync progress across all streams
173pub struct SyncProgress {
174    /// Activities stream progress
175    pub activities: StreamProgress,
176    /// GPX downloads progress
177    pub gpx: StreamProgress,
178    /// Health data progress
179    pub health: StreamProgress,
180    /// Performance metrics progress
181    pub performance: StreamProgress,
182    /// Start time for ETA calculation
183    pub start_time: Instant,
184    /// User profile name
185    pub profile_name: Mutex<String>,
186    /// Date range being synced (legacy, kept for compatibility)
187    pub date_range: Mutex<String>,
188    /// Current sync mode
189    sync_mode: AtomicU8,
190    /// Latest sync date range (from -> to)
191    pub latest_range: Mutex<Option<(String, String)>>,
192    /// Backfill date range (frontier -> target, syncing backwards)
193    pub backfill_range: Mutex<Option<(String, String)>>,
194    /// Request rate history (last 60 seconds)
195    pub rate_history: Mutex<Vec<u32>>,
196    /// Total requests made
197    pub total_requests: AtomicU32,
198    /// Error details for display
199    pub errors: Mutex<Vec<ErrorEntry>>,
200    /// Storage path for display
201    pub storage_path: Mutex<String>,
202    /// Whether we're in planning phase
203    pub is_planning: AtomicBool,
204    /// Whether shutdown has been requested
205    pub shutdown: AtomicBool,
206    /// Current planning step
207    pub planning_step: Mutex<PlanningStep>,
208    /// Oldest activity date found during planning
209    pub oldest_activity_date: Mutex<Option<String>>,
210}
211
212impl SyncProgress {
213    /// Create new sync progress tracker
214    pub fn new() -> Self {
215        Self {
216            activities: StreamProgress::new("Activities"),
217            gpx: StreamProgress::new("GPX"),
218            health: StreamProgress::new("Health"),
219            performance: StreamProgress::new("Performance"),
220            start_time: Instant::now(),
221            profile_name: Mutex::new(String::new()),
222            date_range: Mutex::new(String::new()),
223            sync_mode: AtomicU8::new(SyncMode::Latest as u8),
224            latest_range: Mutex::new(None),
225            backfill_range: Mutex::new(None),
226            rate_history: Mutex::new(vec![0; 60]),
227            total_requests: AtomicU32::new(0),
228            errors: Mutex::new(Vec::new()),
229            storage_path: Mutex::new(String::new()),
230            is_planning: AtomicBool::new(true),
231            shutdown: AtomicBool::new(false),
232            planning_step: Mutex::new(PlanningStep::FetchingProfile),
233            oldest_activity_date: Mutex::new(None),
234        }
235    }
236
237    /// Set planning step
238    pub fn set_planning_step(&self, step: PlanningStep) {
239        *self.planning_step.lock().unwrap() = step;
240    }
241
242    /// Get current planning step
243    pub fn get_planning_step(&self) -> PlanningStep {
244        self.planning_step.lock().unwrap().clone()
245    }
246
247    /// Mark planning as complete
248    pub fn finish_planning(&self) {
249        self.is_planning.store(false, Ordering::Relaxed);
250        *self.planning_step.lock().unwrap() = PlanningStep::Complete;
251    }
252
253    /// Check if still planning
254    pub fn is_planning(&self) -> bool {
255        self.is_planning.load(Ordering::Relaxed)
256    }
257
258    /// Request that progress reporting shuts down
259    pub fn request_shutdown(&self) {
260        self.shutdown.store(true, Ordering::Relaxed);
261    }
262
263    /// Check if shutdown has been requested
264    pub fn should_shutdown(&self) -> bool {
265        self.shutdown.load(Ordering::Relaxed)
266    }
267
268    /// Set oldest activity date found
269    pub fn set_oldest_activity_date(&self, date: &str) {
270        *self.oldest_activity_date.lock().unwrap() = Some(date.to_string());
271    }
272
273    /// Get oldest activity date
274    pub fn get_oldest_activity_date(&self) -> Option<String> {
275        self.oldest_activity_date.lock().unwrap().clone()
276    }
277
278    /// Set profile name
279    pub fn set_profile(&self, name: &str) {
280        let mut profile = self.profile_name.lock().unwrap();
281        *profile = name.to_string();
282    }
283
284    /// Get profile name
285    pub fn get_profile(&self) -> String {
286        self.profile_name.lock().unwrap().clone()
287    }
288
289    /// Set date range
290    pub fn set_date_range(&self, from: &str, to: &str) {
291        let mut range = self.date_range.lock().unwrap();
292        *range = format!("{} -> {}", from, to);
293    }
294
295    /// Get date range
296    pub fn get_date_range(&self) -> String {
297        self.date_range.lock().unwrap().clone()
298    }
299
300    /// Set sync mode
301    pub fn set_sync_mode(&self, mode: SyncMode) {
302        self.sync_mode.store(mode as u8, Ordering::Relaxed);
303    }
304
305    /// Get sync mode
306    pub fn get_sync_mode(&self) -> SyncMode {
307        match self.sync_mode.load(Ordering::Relaxed) {
308            0 => SyncMode::Latest,
309            1 => SyncMode::Backfill,
310            _ => SyncMode::Latest,
311        }
312    }
313
314    /// Set latest sync date range
315    pub fn set_latest_range(&self, from: &str, to: &str) {
316        *self.latest_range.lock().unwrap() = Some((from.to_string(), to.to_string()));
317    }
318
319    /// Get latest sync date range
320    pub fn get_latest_range(&self) -> Option<(String, String)> {
321        self.latest_range.lock().unwrap().clone()
322    }
323
324    /// Set backfill date range (frontier -> target, syncing backwards)
325    pub fn set_backfill_range(&self, frontier: &str, target: &str) {
326        *self.backfill_range.lock().unwrap() = Some((frontier.to_string(), target.to_string()));
327    }
328
329    /// Get backfill date range
330    pub fn get_backfill_range(&self) -> Option<(String, String)> {
331        self.backfill_range.lock().unwrap().clone()
332    }
333
334    /// Set storage path
335    pub fn set_storage_path(&self, path: &str) {
336        let mut storage = self.storage_path.lock().unwrap();
337        *storage = path.to_string();
338    }
339
340    /// Get storage path
341    pub fn get_storage_path(&self) -> String {
342        self.storage_path.lock().unwrap().clone()
343    }
344
345    /// Add an error entry
346    pub fn add_error(&self, stream: &'static str, item: String, error: String) {
347        let mut errors = self.errors.lock().unwrap();
348        errors.push(ErrorEntry {
349            stream,
350            item,
351            error,
352        });
353    }
354
355    /// Get all errors
356    pub fn get_errors(&self) -> Vec<ErrorEntry> {
357        self.errors.lock().unwrap().clone()
358    }
359
360    /// Get current task description (finds first active stream)
361    pub fn get_current_task(&self) -> Option<String> {
362        // Check streams in order of typical processing
363        let streams = [&self.activities, &self.gpx, &self.health, &self.performance];
364
365        for stream in streams {
366            let current = stream.get_current_item();
367            if !current.is_empty() {
368                return Some(format!("{}: {}", stream.name, current));
369            }
370        }
371
372        None
373    }
374
375    /// Record a request for rate tracking
376    pub fn record_request(&self) {
377        self.total_requests.fetch_add(1, Ordering::Relaxed);
378    }
379
380    /// Update rate history (call once per second)
381    pub fn update_rate_history(&self) {
382        let current = self.total_requests.load(Ordering::Relaxed);
383        let mut history = self.rate_history.lock().unwrap();
384
385        // Shift history left, add current rate
386        if history.len() >= 60 {
387            history.remove(0);
388        }
389        history.push(current);
390    }
391
392    /// Get rate per minute (average over last minute)
393    pub fn requests_per_minute(&self) -> u32 {
394        let history = self.rate_history.lock().unwrap();
395        if history.len() < 2 {
396            return 0;
397        }
398        let start = history.first().copied().unwrap_or(0);
399        let end = history.last().copied().unwrap_or(0);
400        end.saturating_sub(start)
401    }
402
403    /// Get elapsed time as formatted string
404    pub fn elapsed_str(&self) -> String {
405        let elapsed = self.start_time.elapsed();
406        let secs = elapsed.as_secs();
407        let mins = secs / 60;
408        let remaining_secs = secs % 60;
409
410        if mins > 0 {
411            format!("{}m {}s", mins, remaining_secs)
412        } else {
413            format!("{}s", secs)
414        }
415    }
416
417    /// Estimate time remaining
418    pub fn eta_str(&self) -> String {
419        let total = self.total_remaining();
420        let completed = self.total_completed();
421
422        if completed == 0 {
423            return "calculating...".to_string();
424        }
425
426        let elapsed = self.start_time.elapsed().as_secs_f64();
427        let rate = completed as f64 / elapsed;
428
429        if rate < 0.01 {
430            return "unknown".to_string();
431        }
432
433        let remaining = total.saturating_sub(completed);
434        let eta_secs = (remaining as f64 / rate) as u64;
435
436        if eta_secs > 3600 {
437            let hours = eta_secs / 3600;
438            let mins = (eta_secs % 3600) / 60;
439            format!("~{}h {}m", hours, mins)
440        } else if eta_secs > 60 {
441            let mins = eta_secs / 60;
442            format!("~{} minutes", mins)
443        } else {
444            format!("~{} seconds", eta_secs)
445        }
446    }
447
448    /// Get total items remaining across all streams
449    pub fn total_remaining(&self) -> u32 {
450        self.activities.get_total()
451            + self.gpx.get_total()
452            + self.health.get_total()
453            + self.performance.get_total()
454    }
455
456    /// Get total completed across all streams
457    pub fn total_completed(&self) -> u32 {
458        self.activities.get_completed()
459            + self.gpx.get_completed()
460            + self.health.get_completed()
461            + self.performance.get_completed()
462    }
463
464    /// Get total failed across all streams
465    pub fn total_failed(&self) -> u32 {
466        self.activities.get_failed()
467            + self.gpx.get_failed()
468            + self.health.get_failed()
469            + self.performance.get_failed()
470    }
471
472    /// Check if all streams are complete
473    pub fn is_complete(&self) -> bool {
474        (self.activities.get_total() == 0 || self.activities.is_complete())
475            && (self.gpx.get_total() == 0 || self.gpx.is_complete())
476            && (self.health.get_total() == 0 || self.health.is_complete())
477            && (self.performance.get_total() == 0 || self.performance.is_complete())
478    }
479
480    /// Print a status line for terminal progress reporting.
481    pub fn print_simple_status(&self) {
482        let act = &self.activities;
483        let gpx = &self.gpx;
484        let health = &self.health;
485        let perf = &self.performance;
486
487        print!(
488            "\rAct: {}/{} | GPX: {}/{} | Health: {}/{} | Perf: {}/{} | {} ",
489            act.get_completed(),
490            act.get_total(),
491            gpx.get_completed(),
492            gpx.get_total(),
493            health.get_completed(),
494            health.get_total(),
495            perf.get_completed(),
496            perf.get_total(),
497            self.elapsed_str(),
498        );
499        let _ = std::io::Write::flush(&mut std::io::stdout());
500    }
501}
502
503impl Default for SyncProgress {
504    fn default() -> Self {
505        Self::new()
506    }
507}
508
509impl std::fmt::Display for PlanningStep {
510    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511        match self {
512            PlanningStep::FetchingProfile => write!(f, "Fetching profile..."),
513            PlanningStep::FindingOldestActivity => write!(f, "Finding oldest activity..."),
514            PlanningStep::FindingFirstHealth => write!(f, "Finding first health data..."),
515            PlanningStep::FindingFirstPerformance => write!(f, "Finding first performance data..."),
516            PlanningStep::PlanningActivities => write!(f, "Planning activity sync..."),
517            PlanningStep::PlanningHealth { days } => {
518                write!(f, "Planning health sync ({} days)...", days)
519            }
520            PlanningStep::PlanningPerformance { weeks } => {
521                write!(f, "Planning performance sync ({} weeks)...", weeks)
522            }
523            PlanningStep::Complete => write!(f, "Planning complete"),
524        }
525    }
526}
527
528/// Shared progress wrapped in Arc for parallel access
529pub type SharedProgress = Arc<SyncProgress>;
530
531#[cfg(test)]
532mod tests {
533    use super::*;
534
535    #[test]
536    fn test_stream_progress() {
537        let progress = StreamProgress::new("Test");
538
539        progress.set_total(100);
540        assert_eq!(progress.get_total(), 100);
541        assert_eq!(progress.percent(), 0);
542
543        progress.complete_one();
544        assert_eq!(progress.get_completed(), 1);
545        assert_eq!(progress.percent(), 1);
546
547        for _ in 0..49 {
548            progress.complete_one();
549        }
550        assert_eq!(progress.percent(), 50);
551    }
552
553    #[test]
554    fn test_sync_progress() {
555        let progress = SyncProgress::new();
556
557        progress.activities.set_total(10);
558        progress.health.set_total(20);
559
560        assert_eq!(progress.total_remaining(), 30);
561        assert_eq!(progress.total_completed(), 0);
562
563        progress.activities.complete_one();
564        progress.health.complete_one();
565
566        assert_eq!(progress.total_completed(), 2);
567    }
568}