garmin_cli/sync/
progress.rs

1//! Progress tracking for parallel sync with atomic counters
2
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Instant;
6
7/// Progress tracking for a single data stream
8#[derive(Debug)]
9pub struct StreamProgress {
10    /// Display name for this stream
11    pub name: &'static str,
12    /// Total items to process
13    pub total: AtomicU32,
14    /// Completed items
15    pub completed: AtomicU32,
16    /// Failed items
17    pub failed: AtomicU32,
18    /// Last processed item description
19    last_item: Mutex<String>,
20}
21
22impl StreamProgress {
23    /// Create a new stream progress tracker
24    pub fn new(name: &'static str) -> Self {
25        Self {
26            name,
27            total: AtomicU32::new(0),
28            completed: AtomicU32::new(0),
29            failed: AtomicU32::new(0),
30            last_item: Mutex::new(String::new()),
31        }
32    }
33
34    /// Set the total count
35    pub fn set_total(&self, total: u32) {
36        self.total.store(total, Ordering::Relaxed);
37    }
38
39    /// Add to total (for dynamic task generation like activities -> GPX)
40    pub fn add_total(&self, count: u32) {
41        self.total.fetch_add(count, Ordering::Relaxed);
42    }
43
44    /// Increment completed count
45    pub fn complete_one(&self) {
46        self.completed.fetch_add(1, Ordering::Relaxed);
47    }
48
49    /// Increment failed count
50    pub fn fail_one(&self) {
51        self.failed.fetch_add(1, Ordering::Relaxed);
52    }
53
54    /// Set the last processed item description
55    pub fn set_last_item(&self, desc: String) {
56        let mut last = self.last_item.lock().unwrap();
57        *last = desc;
58    }
59
60    /// Get the last processed item description
61    pub fn get_last_item(&self) -> String {
62        self.last_item.lock().unwrap().clone()
63    }
64
65    /// Get completion percentage (0-100)
66    pub fn percent(&self) -> u16 {
67        let total = self.total.load(Ordering::Relaxed);
68        if total == 0 {
69            return 0;
70        }
71        let completed = self.completed.load(Ordering::Relaxed);
72        ((completed as f64 / total as f64) * 100.0) as u16
73    }
74
75    /// Check if this stream is complete
76    pub fn is_complete(&self) -> bool {
77        let total = self.total.load(Ordering::Relaxed);
78        let completed = self.completed.load(Ordering::Relaxed);
79        let failed = self.failed.load(Ordering::Relaxed);
80        total > 0 && (completed + failed) >= total
81    }
82
83    /// Get total count
84    pub fn get_total(&self) -> u32 {
85        self.total.load(Ordering::Relaxed)
86    }
87
88    /// Get completed count
89    pub fn get_completed(&self) -> u32 {
90        self.completed.load(Ordering::Relaxed)
91    }
92
93    /// Get failed count
94    pub fn get_failed(&self) -> u32 {
95        self.failed.load(Ordering::Relaxed)
96    }
97}
98
99/// Overall sync progress across all streams
100pub struct SyncProgress {
101    /// Activities stream progress
102    pub activities: StreamProgress,
103    /// GPX downloads progress
104    pub gpx: StreamProgress,
105    /// Health data progress
106    pub health: StreamProgress,
107    /// Performance metrics progress
108    pub performance: StreamProgress,
109    /// Start time for ETA calculation
110    pub start_time: Instant,
111    /// User profile name
112    pub profile_name: Mutex<String>,
113    /// Date range being synced
114    pub date_range: Mutex<String>,
115    /// Request rate history (last 60 seconds)
116    pub rate_history: Mutex<Vec<u32>>,
117    /// Total requests made
118    pub total_requests: AtomicU32,
119}
120
121impl SyncProgress {
122    /// Create new sync progress tracker
123    pub fn new() -> Self {
124        Self {
125            activities: StreamProgress::new("Activities"),
126            gpx: StreamProgress::new("GPX Downloads"),
127            health: StreamProgress::new("Health"),
128            performance: StreamProgress::new("Performance"),
129            start_time: Instant::now(),
130            profile_name: Mutex::new(String::new()),
131            date_range: Mutex::new(String::new()),
132            rate_history: Mutex::new(vec![0; 60]),
133            total_requests: AtomicU32::new(0),
134        }
135    }
136
137    /// Set profile name
138    pub fn set_profile(&self, name: &str) {
139        let mut profile = self.profile_name.lock().unwrap();
140        *profile = name.to_string();
141    }
142
143    /// Get profile name
144    pub fn get_profile(&self) -> String {
145        self.profile_name.lock().unwrap().clone()
146    }
147
148    /// Set date range
149    pub fn set_date_range(&self, from: &str, to: &str) {
150        let mut range = self.date_range.lock().unwrap();
151        *range = format!("{} -> {}", from, to);
152    }
153
154    /// Get date range
155    pub fn get_date_range(&self) -> String {
156        self.date_range.lock().unwrap().clone()
157    }
158
159    /// Record a request for rate tracking
160    pub fn record_request(&self) {
161        self.total_requests.fetch_add(1, Ordering::Relaxed);
162    }
163
164    /// Update rate history (call once per second)
165    pub fn update_rate_history(&self) {
166        let current = self.total_requests.load(Ordering::Relaxed);
167        let mut history = self.rate_history.lock().unwrap();
168
169        // Shift history left, add current rate
170        if history.len() >= 60 {
171            history.remove(0);
172        }
173        history.push(current);
174    }
175
176    /// Get rate per minute (average over last minute)
177    pub fn requests_per_minute(&self) -> u32 {
178        let history = self.rate_history.lock().unwrap();
179        if history.len() < 2 {
180            return 0;
181        }
182        let start = history.first().copied().unwrap_or(0);
183        let end = history.last().copied().unwrap_or(0);
184        end.saturating_sub(start)
185    }
186
187    /// Get elapsed time as formatted string
188    pub fn elapsed_str(&self) -> String {
189        let elapsed = self.start_time.elapsed();
190        let secs = elapsed.as_secs();
191        let mins = secs / 60;
192        let remaining_secs = secs % 60;
193
194        if mins > 0 {
195            format!("{}m {}s", mins, remaining_secs)
196        } else {
197            format!("{}s", secs)
198        }
199    }
200
201    /// Estimate time remaining
202    pub fn eta_str(&self) -> String {
203        let total = self.total_remaining();
204        let completed = self.total_completed();
205
206        if completed == 0 {
207            return "calculating...".to_string();
208        }
209
210        let elapsed = self.start_time.elapsed().as_secs_f64();
211        let rate = completed as f64 / elapsed;
212
213        if rate < 0.01 {
214            return "unknown".to_string();
215        }
216
217        let remaining = total.saturating_sub(completed);
218        let eta_secs = (remaining as f64 / rate) as u64;
219
220        if eta_secs > 3600 {
221            let hours = eta_secs / 3600;
222            let mins = (eta_secs % 3600) / 60;
223            format!("~{}h {}m", hours, mins)
224        } else if eta_secs > 60 {
225            let mins = eta_secs / 60;
226            format!("~{} minutes", mins)
227        } else {
228            format!("~{} seconds", eta_secs)
229        }
230    }
231
232    /// Get total items remaining across all streams
233    pub fn total_remaining(&self) -> u32 {
234        self.activities.get_total()
235            + self.gpx.get_total()
236            + self.health.get_total()
237            + self.performance.get_total()
238    }
239
240    /// Get total completed across all streams
241    pub fn total_completed(&self) -> u32 {
242        self.activities.get_completed()
243            + self.gpx.get_completed()
244            + self.health.get_completed()
245            + self.performance.get_completed()
246    }
247
248    /// Get total failed across all streams
249    pub fn total_failed(&self) -> u32 {
250        self.activities.get_failed()
251            + self.gpx.get_failed()
252            + self.health.get_failed()
253            + self.performance.get_failed()
254    }
255
256    /// Check if all streams are complete
257    pub fn is_complete(&self) -> bool {
258        (self.activities.get_total() == 0 || self.activities.is_complete())
259            && (self.gpx.get_total() == 0 || self.gpx.is_complete())
260            && (self.health.get_total() == 0 || self.health.is_complete())
261            && (self.performance.get_total() == 0 || self.performance.is_complete())
262    }
263
264    /// Print simple status line (for --simple mode)
265    pub fn print_simple_status(&self) {
266        let act = &self.activities;
267        let gpx = &self.gpx;
268        let health = &self.health;
269        let perf = &self.performance;
270
271        print!(
272            "\rAct: {}/{} | GPX: {}/{} | Health: {}/{} | Perf: {}/{} | {} ",
273            act.get_completed(),
274            act.get_total(),
275            gpx.get_completed(),
276            gpx.get_total(),
277            health.get_completed(),
278            health.get_total(),
279            perf.get_completed(),
280            perf.get_total(),
281            self.elapsed_str(),
282        );
283        let _ = std::io::Write::flush(&mut std::io::stdout());
284    }
285}
286
287impl Default for SyncProgress {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293/// Shared progress wrapped in Arc for parallel access
294pub type SharedProgress = Arc<SyncProgress>;
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299
300    #[test]
301    fn test_stream_progress() {
302        let progress = StreamProgress::new("Test");
303
304        progress.set_total(100);
305        assert_eq!(progress.get_total(), 100);
306        assert_eq!(progress.percent(), 0);
307
308        progress.complete_one();
309        assert_eq!(progress.get_completed(), 1);
310        assert_eq!(progress.percent(), 1);
311
312        for _ in 0..49 {
313            progress.complete_one();
314        }
315        assert_eq!(progress.percent(), 50);
316    }
317
318    #[test]
319    fn test_sync_progress() {
320        let progress = SyncProgress::new();
321
322        progress.activities.set_total(10);
323        progress.health.set_total(20);
324
325        assert_eq!(progress.total_remaining(), 30);
326        assert_eq!(progress.total_completed(), 0);
327
328        progress.activities.complete_one();
329        progress.health.complete_one();
330
331        assert_eq!(progress.total_completed(), 2);
332    }
333}