Skip to main content

common/
progress.rs

1use tracing::instrument;
2
3/// Number of shards for the counter. More shards reduce contention but increase memory.
4/// 64 shards × 128 bytes = 8KB per counter, which virtually eliminates contention.
5const NUM_SHARDS: usize = 64;
6
7/// Atomic counter padded to cache line size to prevent false sharing.
8/// Each shard lives on its own cache line so concurrent updates from different
9/// threads don't cause cache invalidation.
10/// Uses 128B alignment to support both x86-64 (64B) and ARM (128B) cache lines.
11#[repr(align(128))]
12struct PaddedAtomicU64(std::sync::atomic::AtomicU64);
13
14/// Global counter for assigning shard indices to threads.
15/// Each thread gets a unique index (mod NUM_SHARDS) on first access.
16static NEXT_SHARD_INDEX: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
17
18thread_local! {
19    /// Per-thread shard index, assigned once on first access.
20    /// Uses modulo to wrap around when more threads than shards.
21    static MY_SHARD: usize =
22        NEXT_SHARD_INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % NUM_SHARDS;
23}
24
25/// Sharded atomic counter optimized for concurrent access from multiple threads.
26///
27/// Uses cache-line-padded shards to prevent false sharing. Each thread is assigned
28/// a shard index, so updates from different threads typically hit different cache lines.
29///
30/// This design handles interleaved access to multiple counters efficiently - unlike
31/// a single-slot cache approach, there's no "cache thrashing" when alternating between
32/// counters.
33///
34/// # Memory
35///
36/// Each counter uses NUM_SHARDS × 128 bytes = 8KB (with 64 shards).
37/// This is larger than a simple AtomicU64 but virtually eliminates contention.
38pub struct TlsCounter {
39    shards: [PaddedAtomicU64; NUM_SHARDS],
40}
41
42impl TlsCounter {
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            shards: std::array::from_fn(|_| PaddedAtomicU64(std::sync::atomic::AtomicU64::new(0))),
47        }
48    }
49
50    pub fn add(&self, value: u64) {
51        let shard = MY_SHARD.with(|&s| s);
52        self.shards[shard]
53            .0
54            .fetch_add(value, std::sync::atomic::Ordering::Relaxed);
55    }
56
57    pub fn inc(&self) {
58        self.add(1);
59    }
60
61    pub fn get(&self) -> u64 {
62        self.shards
63            .iter()
64            .map(|s| s.0.load(std::sync::atomic::Ordering::Relaxed))
65            .sum()
66    }
67}
68
69impl std::fmt::Debug for TlsCounter {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("TlsCounter")
72            .field("value", &self.get())
73            .finish()
74    }
75}
76
77impl Default for TlsCounter {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83#[derive(Debug)]
84pub struct ProgressCounter {
85    started: TlsCounter,
86    finished: TlsCounter,
87}
88
89impl Default for ProgressCounter {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95pub struct ProgressGuard<'a> {
96    progress: &'a ProgressCounter,
97}
98
99impl<'a> ProgressGuard<'a> {
100    pub fn new(progress: &'a ProgressCounter) -> Self {
101        progress.started.inc();
102        Self { progress }
103    }
104}
105
106impl Drop for ProgressGuard<'_> {
107    fn drop(&mut self) {
108        self.progress.finished.inc();
109    }
110}
111
112pub struct Status {
113    pub started: u64,
114    pub finished: u64,
115}
116
117impl ProgressCounter {
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            started: TlsCounter::new(),
122            finished: TlsCounter::new(),
123        }
124    }
125
126    pub fn guard(&self) -> ProgressGuard<'_> {
127        ProgressGuard::new(self)
128    }
129
130    #[instrument]
131    pub fn get(&self) -> Status {
132        let mut status = Status {
133            started: self.started.get(),
134            finished: self.finished.get(),
135        };
136        if status.finished > status.started {
137            tracing::debug!(
138                "Progress inversion - started: {}, finished {}",
139                status.started,
140                status.finished
141            );
142            status.started = status.finished;
143        }
144        status
145    }
146}
147
148pub struct Progress {
149    pub ops: ProgressCounter,
150    pub bytes_copied: TlsCounter,
151    pub hard_links_created: TlsCounter,
152    pub files_copied: TlsCounter,
153    pub symlinks_created: TlsCounter,
154    pub directories_created: TlsCounter,
155    pub files_unchanged: TlsCounter,
156    pub symlinks_unchanged: TlsCounter,
157    pub directories_unchanged: TlsCounter,
158    pub hard_links_unchanged: TlsCounter,
159    pub files_removed: TlsCounter,
160    pub symlinks_removed: TlsCounter,
161    pub directories_removed: TlsCounter,
162    pub bytes_removed: TlsCounter,
163    pub files_skipped: TlsCounter,
164    pub symlinks_skipped: TlsCounter,
165    pub directories_skipped: TlsCounter,
166    pub specials_skipped: TlsCounter,
167    start_time: std::time::Instant,
168}
169
170impl Progress {
171    #[must_use]
172    pub fn new() -> Self {
173        Self {
174            ops: Default::default(),
175            bytes_copied: Default::default(),
176            hard_links_created: Default::default(),
177            files_copied: Default::default(),
178            symlinks_created: Default::default(),
179            directories_created: Default::default(),
180            files_unchanged: Default::default(),
181            symlinks_unchanged: Default::default(),
182            directories_unchanged: Default::default(),
183            hard_links_unchanged: Default::default(),
184            files_removed: Default::default(),
185            symlinks_removed: Default::default(),
186            directories_removed: Default::default(),
187            bytes_removed: Default::default(),
188            files_skipped: Default::default(),
189            symlinks_skipped: Default::default(),
190            directories_skipped: Default::default(),
191            specials_skipped: Default::default(),
192            start_time: std::time::Instant::now(),
193        }
194    }
195
196    pub fn get_duration(&self) -> std::time::Duration {
197        self.start_time.elapsed()
198    }
199}
200
201impl Default for Progress {
202    fn default() -> Self {
203        Self::new()
204    }
205}
206
207#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
208pub struct SerializableProgress {
209    pub ops_started: u64,
210    pub ops_finished: u64,
211    pub bytes_copied: u64,
212    pub hard_links_created: u64,
213    pub files_copied: u64,
214    pub symlinks_created: u64,
215    pub directories_created: u64,
216    pub files_unchanged: u64,
217    pub symlinks_unchanged: u64,
218    pub directories_unchanged: u64,
219    pub hard_links_unchanged: u64,
220    pub files_removed: u64,
221    pub symlinks_removed: u64,
222    pub directories_removed: u64,
223    pub bytes_removed: u64,
224    pub files_skipped: u64,
225    pub symlinks_skipped: u64,
226    pub directories_skipped: u64,
227    pub specials_skipped: u64,
228    pub current_time: std::time::SystemTime,
229}
230
231impl Default for SerializableProgress {
232    fn default() -> Self {
233        Self {
234            ops_started: 0,
235            ops_finished: 0,
236            bytes_copied: 0,
237            hard_links_created: 0,
238            files_copied: 0,
239            symlinks_created: 0,
240            directories_created: 0,
241            files_unchanged: 0,
242            symlinks_unchanged: 0,
243            directories_unchanged: 0,
244            hard_links_unchanged: 0,
245            files_removed: 0,
246            symlinks_removed: 0,
247            directories_removed: 0,
248            bytes_removed: 0,
249            files_skipped: 0,
250            symlinks_skipped: 0,
251            directories_skipped: 0,
252            specials_skipped: 0,
253            current_time: std::time::SystemTime::now(),
254        }
255    }
256}
257
258impl From<&Progress> for SerializableProgress {
259    /// Creates a `SerializableProgress` from a Progress, capturing the current time at the moment of conversion
260    fn from(progress: &Progress) -> Self {
261        Self {
262            ops_started: progress.ops.started.get(),
263            ops_finished: progress.ops.finished.get(),
264            bytes_copied: progress.bytes_copied.get(),
265            hard_links_created: progress.hard_links_created.get(),
266            files_copied: progress.files_copied.get(),
267            symlinks_created: progress.symlinks_created.get(),
268            directories_created: progress.directories_created.get(),
269            files_unchanged: progress.files_unchanged.get(),
270            symlinks_unchanged: progress.symlinks_unchanged.get(),
271            directories_unchanged: progress.directories_unchanged.get(),
272            hard_links_unchanged: progress.hard_links_unchanged.get(),
273            files_removed: progress.files_removed.get(),
274            symlinks_removed: progress.symlinks_removed.get(),
275            directories_removed: progress.directories_removed.get(),
276            bytes_removed: progress.bytes_removed.get(),
277            files_skipped: progress.files_skipped.get(),
278            symlinks_skipped: progress.symlinks_skipped.get(),
279            directories_skipped: progress.directories_skipped.get(),
280            specials_skipped: progress.specials_skipped.get(),
281            current_time: std::time::SystemTime::now(),
282        }
283    }
284}
285
286/// Trait implemented by per-tool progress printers.
287///
288/// Each tool has its own printer that only renders sections relevant to what
289/// the tool actually does (e.g. `rrm` has no COPIED section; `rcmp` only shows
290/// OPS). Prefer adding a new specialized printer over extending a shared one.
291pub trait LocalProgressReport: Send {
292    fn print(&mut self) -> anyhow::Result<String>;
293}
294
295/// Selects which tool-specific printer to instantiate for a local progress run.
296#[derive(Copy, Clone, Debug)]
297pub enum LocalProgressKind {
298    Copy,
299    Remove,
300    Link,
301    Compare,
302    Filegen,
303}
304
305fn ops_rates(
306    progress: &Progress,
307    last_ops: u64,
308    last_update: std::time::Instant,
309    now: std::time::Instant,
310) -> (Status, f64, f64) {
311    let ops = progress.ops.get();
312    let total_secs = progress.get_duration().as_secs_f64();
313    let curr_secs = (now - last_update).as_secs_f64();
314    let average = if total_secs > 0.0 {
315        ops.finished as f64 / total_secs
316    } else {
317        0.0
318    };
319    let current = if curr_secs > 0.0 {
320        (ops.finished - last_ops) as f64 / curr_secs
321    } else {
322        0.0
323    };
324    (ops, average, current)
325}
326
327fn bytes_rates(
328    total: u64,
329    last: u64,
330    total_secs: f64,
331    curr_secs: f64,
332) -> (bytesize::ByteSize, bytesize::ByteSize) {
333    let average = if total_secs > 0.0 {
334        total as f64 / total_secs
335    } else {
336        0.0
337    };
338    let current = if curr_secs > 0.0 {
339        (total - last) as f64 / curr_secs
340    } else {
341        0.0
342    };
343    (
344        bytesize::ByteSize(average as u64),
345        bytesize::ByteSize(current as u64),
346    )
347}
348
349/// Progress printer for `rcp` (local copy). Shows COPIED/UNCHANGED/REMOVED/SKIPPED
350/// for files, symlinks, and directories. Hard-link counters are omitted because
351/// `rcp` never creates hard links.
352pub struct CopyProgressPrinter<'a> {
353    progress: &'a Progress,
354    last_ops: u64,
355    last_bytes: u64,
356    last_update: std::time::Instant,
357}
358
359impl<'a> CopyProgressPrinter<'a> {
360    pub fn new(progress: &'a Progress) -> Self {
361        Self {
362            progress,
363            last_ops: progress.ops.get().finished,
364            last_bytes: progress.bytes_copied.get(),
365            last_update: std::time::Instant::now(),
366        }
367    }
368}
369
370impl<'a> LocalProgressReport for CopyProgressPrinter<'a> {
371    fn print(&mut self) -> anyhow::Result<String> {
372        let now = std::time::Instant::now();
373        let (ops, avg_ops, cur_ops) =
374            ops_rates(self.progress, self.last_ops, self.last_update, now);
375        let bytes = self.progress.bytes_copied.get();
376        let total_secs = self.progress.get_duration().as_secs_f64();
377        let curr_secs = (now - self.last_update).as_secs_f64();
378        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
379        self.last_ops = ops.finished;
380        self.last_bytes = bytes;
381        self.last_update = now;
382        Ok(format!(
383            "---------------------\n\
384            OPS:\n\
385            pending: {:>10}\n\
386            average: {:>10.2} items/s\n\
387            current: {:>10.2} items/s\n\
388            -----------------------\n\
389            COPIED:\n\
390            average: {:>10}/s\n\
391            current: {:>10}/s\n\
392            bytes:   {:>10}\n\
393            files:       {:>10}\n\
394            symlinks:    {:>10}\n\
395            directories: {:>10}\n\
396            -----------------------\n\
397            UNCHANGED:\n\
398            files:       {:>10}\n\
399            symlinks:    {:>10}\n\
400            directories: {:>10}\n\
401            -----------------------\n\
402            REMOVED:\n\
403            bytes:       {:>10}\n\
404            files:       {:>10}\n\
405            symlinks:    {:>10}\n\
406            directories: {:>10}\n\
407            -----------------------\n\
408            SKIPPED:\n\
409            files:       {:>10}\n\
410            symlinks:    {:>10}\n\
411            directories: {:>10}\n\
412            specials:    {:>10}",
413            ops.started - ops.finished,
414            avg_ops,
415            cur_ops,
416            avg_bytes,
417            cur_bytes,
418            bytesize::ByteSize(bytes),
419            self.progress.files_copied.get(),
420            self.progress.symlinks_created.get(),
421            self.progress.directories_created.get(),
422            self.progress.files_unchanged.get(),
423            self.progress.symlinks_unchanged.get(),
424            self.progress.directories_unchanged.get(),
425            bytesize::ByteSize(self.progress.bytes_removed.get()),
426            self.progress.files_removed.get(),
427            self.progress.symlinks_removed.get(),
428            self.progress.directories_removed.get(),
429            self.progress.files_skipped.get(),
430            self.progress.symlinks_skipped.get(),
431            self.progress.directories_skipped.get(),
432            self.progress.specials_skipped.get(),
433        ))
434    }
435}
436
437/// Progress printer for `rrm`. Only shows REMOVED and SKIPPED — `rrm` never
438/// creates, copies, or leaves entries unchanged.
439pub struct RemoveProgressPrinter<'a> {
440    progress: &'a Progress,
441    last_ops: u64,
442    last_bytes_removed: u64,
443    last_update: std::time::Instant,
444}
445
446impl<'a> RemoveProgressPrinter<'a> {
447    pub fn new(progress: &'a Progress) -> Self {
448        Self {
449            progress,
450            last_ops: progress.ops.get().finished,
451            last_bytes_removed: progress.bytes_removed.get(),
452            last_update: std::time::Instant::now(),
453        }
454    }
455}
456
457impl<'a> LocalProgressReport for RemoveProgressPrinter<'a> {
458    fn print(&mut self) -> anyhow::Result<String> {
459        let now = std::time::Instant::now();
460        let (ops, avg_ops, cur_ops) =
461            ops_rates(self.progress, self.last_ops, self.last_update, now);
462        let bytes_removed = self.progress.bytes_removed.get();
463        let total_secs = self.progress.get_duration().as_secs_f64();
464        let curr_secs = (now - self.last_update).as_secs_f64();
465        let (avg_bytes, cur_bytes) = bytes_rates(
466            bytes_removed,
467            self.last_bytes_removed,
468            total_secs,
469            curr_secs,
470        );
471        self.last_ops = ops.finished;
472        self.last_bytes_removed = bytes_removed;
473        self.last_update = now;
474        Ok(format!(
475            "---------------------\n\
476            OPS:\n\
477            pending: {:>10}\n\
478            average: {:>10.2} items/s\n\
479            current: {:>10.2} items/s\n\
480            -----------------------\n\
481            REMOVED:\n\
482            average: {:>10}/s\n\
483            current: {:>10}/s\n\
484            bytes:       {:>10}\n\
485            files:       {:>10}\n\
486            symlinks:    {:>10}\n\
487            directories: {:>10}\n\
488            -----------------------\n\
489            SKIPPED:\n\
490            files:       {:>10}\n\
491            symlinks:    {:>10}\n\
492            directories: {:>10}",
493            ops.started - ops.finished,
494            avg_ops,
495            cur_ops,
496            avg_bytes,
497            cur_bytes,
498            bytesize::ByteSize(bytes_removed),
499            self.progress.files_removed.get(),
500            self.progress.symlinks_removed.get(),
501            self.progress.directories_removed.get(),
502            self.progress.files_skipped.get(),
503            self.progress.symlinks_skipped.get(),
504            self.progress.directories_skipped.get(),
505        ))
506    }
507}
508
509/// Progress printer for `rlink`. Like [`CopyProgressPrinter`] but also reports
510/// hard-link counters, since `rlink`'s primary output is hard links (with copies
511/// as a fallback for `--update` changes).
512pub struct LinkProgressPrinter<'a> {
513    progress: &'a Progress,
514    last_ops: u64,
515    last_bytes: u64,
516    last_update: std::time::Instant,
517}
518
519impl<'a> LinkProgressPrinter<'a> {
520    pub fn new(progress: &'a Progress) -> Self {
521        Self {
522            progress,
523            last_ops: progress.ops.get().finished,
524            last_bytes: progress.bytes_copied.get(),
525            last_update: std::time::Instant::now(),
526        }
527    }
528}
529
530impl<'a> LocalProgressReport for LinkProgressPrinter<'a> {
531    fn print(&mut self) -> anyhow::Result<String> {
532        let now = std::time::Instant::now();
533        let (ops, avg_ops, cur_ops) =
534            ops_rates(self.progress, self.last_ops, self.last_update, now);
535        let bytes = self.progress.bytes_copied.get();
536        let total_secs = self.progress.get_duration().as_secs_f64();
537        let curr_secs = (now - self.last_update).as_secs_f64();
538        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
539        self.last_ops = ops.finished;
540        self.last_bytes = bytes;
541        self.last_update = now;
542        Ok(format!(
543            "---------------------\n\
544            OPS:\n\
545            pending: {:>10}\n\
546            average: {:>10.2} items/s\n\
547            current: {:>10.2} items/s\n\
548            -----------------------\n\
549            LINKED / COPIED:\n\
550            average: {:>10}/s\n\
551            current: {:>10}/s\n\
552            bytes:   {:>10}\n\
553            hard-links:  {:>10}\n\
554            files:       {:>10}\n\
555            symlinks:    {:>10}\n\
556            directories: {:>10}\n\
557            -----------------------\n\
558            UNCHANGED:\n\
559            hard-links:  {:>10}\n\
560            files:       {:>10}\n\
561            symlinks:    {:>10}\n\
562            directories: {:>10}\n\
563            -----------------------\n\
564            REMOVED:\n\
565            bytes:       {:>10}\n\
566            files:       {:>10}\n\
567            symlinks:    {:>10}\n\
568            directories: {:>10}\n\
569            -----------------------\n\
570            SKIPPED:\n\
571            files:       {:>10}\n\
572            symlinks:    {:>10}\n\
573            directories: {:>10}\n\
574            specials:    {:>10}",
575            ops.started - ops.finished,
576            avg_ops,
577            cur_ops,
578            avg_bytes,
579            cur_bytes,
580            bytesize::ByteSize(bytes),
581            self.progress.hard_links_created.get(),
582            self.progress.files_copied.get(),
583            self.progress.symlinks_created.get(),
584            self.progress.directories_created.get(),
585            self.progress.hard_links_unchanged.get(),
586            self.progress.files_unchanged.get(),
587            self.progress.symlinks_unchanged.get(),
588            self.progress.directories_unchanged.get(),
589            bytesize::ByteSize(self.progress.bytes_removed.get()),
590            self.progress.files_removed.get(),
591            self.progress.symlinks_removed.get(),
592            self.progress.directories_removed.get(),
593            self.progress.files_skipped.get(),
594            self.progress.symlinks_skipped.get(),
595            self.progress.directories_skipped.get(),
596            self.progress.specials_skipped.get(),
597        ))
598    }
599}
600
601/// Progress printer for `rcmp`. Compare operations only drive the ops counter —
602/// they don't copy, remove, or modify anything — so only OPS is shown.
603pub struct CompareProgressPrinter<'a> {
604    progress: &'a Progress,
605    last_ops: u64,
606    last_update: std::time::Instant,
607}
608
609impl<'a> CompareProgressPrinter<'a> {
610    pub fn new(progress: &'a Progress) -> Self {
611        Self {
612            progress,
613            last_ops: progress.ops.get().finished,
614            last_update: std::time::Instant::now(),
615        }
616    }
617}
618
619impl<'a> LocalProgressReport for CompareProgressPrinter<'a> {
620    fn print(&mut self) -> anyhow::Result<String> {
621        let now = std::time::Instant::now();
622        let (ops, avg_ops, cur_ops) =
623            ops_rates(self.progress, self.last_ops, self.last_update, now);
624        self.last_ops = ops.finished;
625        self.last_update = now;
626        Ok(format!(
627            "---------------------\n\
628            OPS:\n\
629            pending: {:>10}\n\
630            compared: {:>9}\n\
631            average: {:>10.2} items/s\n\
632            current: {:>10.2} items/s",
633            ops.started - ops.finished,
634            ops.finished,
635            avg_ops,
636            cur_ops,
637        ))
638    }
639}
640
641/// Progress printer for `filegen`. Shows a GENERATED section instead of COPIED,
642/// because `filegen` creates files and directories rather than copying them.
643pub struct FilegenProgressPrinter<'a> {
644    progress: &'a Progress,
645    last_ops: u64,
646    last_bytes: u64,
647    last_update: std::time::Instant,
648}
649
650impl<'a> FilegenProgressPrinter<'a> {
651    pub fn new(progress: &'a Progress) -> Self {
652        Self {
653            progress,
654            last_ops: progress.ops.get().finished,
655            last_bytes: progress.bytes_copied.get(),
656            last_update: std::time::Instant::now(),
657        }
658    }
659}
660
661impl<'a> LocalProgressReport for FilegenProgressPrinter<'a> {
662    fn print(&mut self) -> anyhow::Result<String> {
663        let now = std::time::Instant::now();
664        let (ops, avg_ops, cur_ops) =
665            ops_rates(self.progress, self.last_ops, self.last_update, now);
666        let bytes = self.progress.bytes_copied.get();
667        let total_secs = self.progress.get_duration().as_secs_f64();
668        let curr_secs = (now - self.last_update).as_secs_f64();
669        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
670        self.last_ops = ops.finished;
671        self.last_bytes = bytes;
672        self.last_update = now;
673        Ok(format!(
674            "---------------------\n\
675            OPS:\n\
676            pending: {:>10}\n\
677            average: {:>10.2} items/s\n\
678            current: {:>10.2} items/s\n\
679            -----------------------\n\
680            GENERATED:\n\
681            average: {:>10}/s\n\
682            current: {:>10}/s\n\
683            bytes:       {:>10}\n\
684            files:       {:>10}\n\
685            directories: {:>10}",
686            ops.started - ops.finished,
687            avg_ops,
688            cur_ops,
689            avg_bytes,
690            cur_bytes,
691            bytesize::ByteSize(bytes),
692            self.progress.files_copied.get(),
693            self.progress.directories_created.get(),
694        ))
695    }
696}
697
698/// Construct the tool-specific printer for the given [`LocalProgressKind`],
699/// borrowing the global [`Progress`] for its lifetime.
700#[must_use]
701pub fn make_local_printer<'a>(
702    kind: LocalProgressKind,
703    progress: &'a Progress,
704) -> Box<dyn LocalProgressReport + 'a> {
705    match kind {
706        LocalProgressKind::Copy => Box::new(CopyProgressPrinter::new(progress)),
707        LocalProgressKind::Remove => Box::new(RemoveProgressPrinter::new(progress)),
708        LocalProgressKind::Link => Box::new(LinkProgressPrinter::new(progress)),
709        LocalProgressKind::Compare => Box::new(CompareProgressPrinter::new(progress)),
710        LocalProgressKind::Filegen => Box::new(FilegenProgressPrinter::new(progress)),
711    }
712}
713
714pub struct RcpdProgressPrinter {
715    start_time: std::time::Instant,
716    last_source_ops: u64,
717    last_source_bytes: u64,
718    last_source_files: u64,
719    last_dest_ops: u64,
720    last_dest_bytes: u64,
721    last_update: std::time::Instant,
722}
723
724impl RcpdProgressPrinter {
725    #[must_use]
726    pub fn new() -> Self {
727        let now = std::time::Instant::now();
728        Self {
729            start_time: now,
730            last_source_ops: 0,
731            last_source_bytes: 0,
732            last_source_files: 0,
733            last_dest_ops: 0,
734            last_dest_bytes: 0,
735            last_update: now,
736        }
737    }
738
739    fn calculate_current_rate(&self, current: u64, last: u64, duration_secs: f64) -> f64 {
740        if duration_secs > 0.0 {
741            (current - last) as f64 / duration_secs
742        } else {
743            0.0
744        }
745    }
746
747    fn calculate_average_rate(&self, total: u64, total_duration_secs: f64) -> f64 {
748        if total_duration_secs > 0.0 {
749            total as f64 / total_duration_secs
750        } else {
751            0.0
752        }
753    }
754
755    pub fn print(
756        &mut self,
757        source_progress: &SerializableProgress,
758        dest_progress: &SerializableProgress,
759    ) -> anyhow::Result<String> {
760        let time_now = std::time::Instant::now();
761        let total_duration_secs = (time_now - self.start_time).as_secs_f64();
762        let curr_duration_secs = (time_now - self.last_update).as_secs_f64();
763        // source current rates
764        let source_ops_rate_curr = self.calculate_current_rate(
765            source_progress.ops_finished,
766            self.last_source_ops,
767            curr_duration_secs,
768        );
769        let source_bytes_rate_curr = self.calculate_current_rate(
770            source_progress.bytes_copied,
771            self.last_source_bytes,
772            curr_duration_secs,
773        );
774        let source_files_rate_curr = self.calculate_current_rate(
775            source_progress.files_copied,
776            self.last_source_files,
777            curr_duration_secs,
778        );
779        // source average rates
780        let source_ops_rate_avg =
781            self.calculate_average_rate(source_progress.ops_finished, total_duration_secs);
782        let source_bytes_rate_avg =
783            self.calculate_average_rate(source_progress.bytes_copied, total_duration_secs);
784        let source_files_rate_avg =
785            self.calculate_average_rate(source_progress.files_copied, total_duration_secs);
786        // destination current rates
787        let dest_ops_rate_curr = self.calculate_current_rate(
788            dest_progress.ops_finished,
789            self.last_dest_ops,
790            curr_duration_secs,
791        );
792        let dest_bytes_rate_curr = self.calculate_current_rate(
793            dest_progress.bytes_copied,
794            self.last_dest_bytes,
795            curr_duration_secs,
796        );
797        // destination average rates
798        let dest_ops_rate_avg =
799            self.calculate_average_rate(dest_progress.ops_finished, total_duration_secs);
800        let dest_bytes_rate_avg =
801            self.calculate_average_rate(dest_progress.bytes_copied, total_duration_secs);
802        // update last values
803        self.last_source_ops = source_progress.ops_finished;
804        self.last_source_bytes = source_progress.bytes_copied;
805        self.last_source_files = source_progress.files_copied;
806        self.last_dest_ops = dest_progress.ops_finished;
807        self.last_dest_bytes = dest_progress.bytes_copied;
808        self.last_update = time_now;
809        Ok(format!(
810            "==== SOURCE =======\n\
811            OPS:\n\
812            pending: {:>10}\n\
813            average: {:>10.2} items/s\n\
814            current: {:>10.2} items/s\n\
815            ---------------------\n\
816            COPIED:\n\
817            average: {:>10}/s\n\
818            current: {:>10}/s\n\
819            bytes:   {:>10}\n\
820            files:       {:>10}\n\
821            ---------------------\n\
822            FILES:\n\
823            average: {:>10.2} files/s\n\
824            current: {:>10.2} files/s\n\
825            ---------------------\n\
826            SKIPPED:\n\
827            files:       {:>10}\n\
828            symlinks:    {:>10}\n\
829            directories: {:>10}\n\
830            specials:    {:>10}\n\
831            ==== DESTINATION ====\n\
832            OPS:\n\
833            pending: {:>10}\n\
834            average: {:>10.2} items/s\n\
835            current: {:>10.2} items/s\n\
836            ---------------------\n\
837            COPIED:\n\
838            average: {:>10}/s\n\
839            current: {:>10}/s\n\
840            bytes:   {:>10}\n\
841            files:       {:>10}\n\
842            symlinks:    {:>10}\n\
843            directories: {:>10}\n\
844            hard-links:  {:>10}\n\
845            ---------------------\n\
846            UNCHANGED:\n\
847            files:       {:>10}\n\
848            symlinks:    {:>10}\n\
849            directories: {:>10}\n\
850            hard-links:  {:>10}\n\
851            ---------------------\n\
852            REMOVED:\n\
853            bytes:       {:>10}\n\
854            files:       {:>10}\n\
855            symlinks:    {:>10}\n\
856            directories: {:>10}",
857            // source section
858            source_progress.ops_started - source_progress.ops_finished, // pending
859            source_ops_rate_avg,
860            source_ops_rate_curr,
861            bytesize::ByteSize(source_bytes_rate_avg as u64),
862            bytesize::ByteSize(source_bytes_rate_curr as u64),
863            bytesize::ByteSize(source_progress.bytes_copied),
864            source_progress.files_copied,
865            source_files_rate_avg,
866            source_files_rate_curr,
867            // source skipped
868            source_progress.files_skipped,
869            source_progress.symlinks_skipped,
870            source_progress.directories_skipped,
871            source_progress.specials_skipped,
872            // destination section
873            dest_progress.ops_started - dest_progress.ops_finished, // pending
874            dest_ops_rate_avg,
875            dest_ops_rate_curr,
876            bytesize::ByteSize(dest_bytes_rate_avg as u64),
877            bytesize::ByteSize(dest_bytes_rate_curr as u64),
878            bytesize::ByteSize(dest_progress.bytes_copied),
879            // destination detailed stats
880            dest_progress.files_copied,
881            dest_progress.symlinks_created,
882            dest_progress.directories_created,
883            dest_progress.hard_links_created,
884            // unchanged
885            dest_progress.files_unchanged,
886            dest_progress.symlinks_unchanged,
887            dest_progress.directories_unchanged,
888            dest_progress.hard_links_unchanged,
889            // removed
890            bytesize::ByteSize(dest_progress.bytes_removed),
891            dest_progress.files_removed,
892            dest_progress.symlinks_removed,
893            dest_progress.directories_removed,
894        ))
895    }
896}
897
898impl Default for RcpdProgressPrinter {
899    fn default() -> Self {
900        Self::new()
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907    use crate::remote_tracing::TracingMessage;
908    use anyhow::Result;
909
910    #[test]
911    fn basic_counting() -> Result<()> {
912        let tls_counter = TlsCounter::new();
913        for _ in 0..10 {
914            tls_counter.inc();
915        }
916        assert!(tls_counter.get() == 10);
917        Ok(())
918    }
919
920    #[test]
921    fn threaded_counting() -> Result<()> {
922        let tls_counter = TlsCounter::new();
923        std::thread::scope(|scope| {
924            let mut handles = Vec::new();
925            for _ in 0..10 {
926                handles.push(scope.spawn(|| {
927                    for _ in 0..100 {
928                        tls_counter.inc();
929                    }
930                }));
931            }
932        });
933        assert!(tls_counter.get() == 1000);
934        Ok(())
935    }
936
937    #[test]
938    fn basic_guard() -> Result<()> {
939        let tls_progress = ProgressCounter::new();
940        let _guard = tls_progress.guard();
941        Ok(())
942    }
943
944    #[test]
945    fn test_serializable_progress() -> Result<()> {
946        let progress = Progress::new();
947
948        // Add some test data
949        progress.files_copied.inc();
950        progress.bytes_copied.add(1024);
951        progress.directories_created.add(2);
952
953        // Test conversion to serializable format
954        let serializable = SerializableProgress::from(&progress);
955        assert_eq!(serializable.files_copied, 1);
956        assert_eq!(serializable.bytes_copied, 1024);
957        assert_eq!(serializable.directories_created, 2);
958
959        // Test that we can create a TracingMessage with progress
960        let _tracing_msg = TracingMessage::Progress(serializable);
961
962        Ok(())
963    }
964
965    #[test]
966    fn test_rcpd_progress_printer() -> Result<()> {
967        let mut printer = RcpdProgressPrinter::new();
968
969        // Create test progress data
970        let source_progress = SerializableProgress {
971            ops_started: 100,
972            ops_finished: 80,
973            bytes_copied: 1024,
974            files_copied: 5,
975            files_skipped: 3,
976            symlinks_skipped: 1,
977            directories_skipped: 2,
978            ..Default::default()
979        };
980
981        let dest_progress = SerializableProgress {
982            ops_started: 80,
983            ops_finished: 70,
984            bytes_copied: 1024,
985            files_copied: 8,
986            symlinks_created: 2,
987            directories_created: 1,
988            ..Default::default()
989        };
990
991        // Test that print returns a formatted string
992        let output = printer.print(&source_progress, &dest_progress)?;
993        assert!(output.contains("SOURCE"));
994        assert!(output.contains("DESTINATION"));
995        assert!(output.contains("OPS:"));
996        assert!(output.contains("pending:"));
997        assert!(output.contains("20")); // source pending ops (100-80)
998        assert!(output.contains("10")); // dest pending ops (80-70)
999        let mut sections = output.split("==== DESTINATION ====");
1000        let source_section = sections.next().unwrap();
1001        let dest_section = sections.next().unwrap_or("");
1002        let source_files_line = source_section
1003            .lines()
1004            .find(|line| line.trim_start().starts_with("files:"))
1005            .expect("source files line missing");
1006        assert!(source_files_line.trim_start().ends_with("5"));
1007        assert!(!source_files_line.contains('.'));
1008        let dest_files_line = dest_section
1009            .lines()
1010            .find(|line| line.trim_start().starts_with("files:"))
1011            .expect("dest files line missing");
1012        assert!(dest_files_line.trim_start().ends_with("8"));
1013        assert!(!dest_files_line.contains('.'));
1014        // verify SKIPPED section appears in source
1015        assert!(source_section.contains("SKIPPED:"));
1016        let skipped_section = source_section
1017            .split("SKIPPED:")
1018            .nth(1)
1019            .expect("SKIPPED section missing in source");
1020        let skipped_lines: Vec<&str> = skipped_section.lines().collect();
1021        let skipped_files_line = skipped_lines
1022            .iter()
1023            .find(|line| line.trim_start().starts_with("files:"))
1024            .expect("skipped files line missing");
1025        assert!(skipped_files_line.trim_start().ends_with("3"));
1026        let skipped_symlinks_line = skipped_lines
1027            .iter()
1028            .find(|line| line.trim_start().starts_with("symlinks:"))
1029            .expect("skipped symlinks line missing");
1030        assert!(skipped_symlinks_line.trim_start().ends_with("1"));
1031        let skipped_dirs_line = skipped_lines
1032            .iter()
1033            .find(|line| line.trim_start().starts_with("directories:"))
1034            .expect("skipped directories line missing");
1035        assert!(skipped_dirs_line.trim_start().ends_with("2"));
1036
1037        Ok(())
1038    }
1039
1040    #[test]
1041    fn interleaved_counter_access() -> Result<()> {
1042        // test that interleaved access to multiple counters works correctly
1043        // (this was problematic with the old single-slot cache design)
1044        let counter_a = TlsCounter::new();
1045        let counter_b = TlsCounter::new();
1046        let counter_c = TlsCounter::new();
1047        for i in 0..100 {
1048            counter_a.add(1);
1049            counter_b.add(2);
1050            counter_c.add(3);
1051            // verify intermediate values are correct
1052            if i % 10 == 0 {
1053                assert_eq!(counter_a.get(), i + 1);
1054                assert_eq!(counter_b.get(), (i + 1) * 2);
1055                assert_eq!(counter_c.get(), (i + 1) * 3);
1056            }
1057        }
1058        // verify final counts
1059        assert_eq!(counter_a.get(), 100);
1060        assert_eq!(counter_b.get(), 200);
1061        assert_eq!(counter_c.get(), 300);
1062        Ok(())
1063    }
1064
1065    #[test]
1066    fn concurrent_multi_counter_access() -> Result<()> {
1067        // test concurrent access with multiple threads each using multiple counters
1068        let counter_a = std::sync::Arc::new(TlsCounter::new());
1069        let counter_b = std::sync::Arc::new(TlsCounter::new());
1070        const THREADS: usize = 4;
1071        const ITERATIONS: u64 = 1000;
1072        let handles: Vec<_> = (0..THREADS)
1073            .map(|_| {
1074                let ca = counter_a.clone();
1075                let cb = counter_b.clone();
1076                std::thread::spawn(move || {
1077                    for _ in 0..ITERATIONS {
1078                        ca.add(1);
1079                        cb.add(2);
1080                    }
1081                })
1082            })
1083            .collect();
1084        for h in handles {
1085            h.join().unwrap();
1086        }
1087        // verify totals are correct (no lost increments)
1088        assert_eq!(counter_a.get(), THREADS as u64 * ITERATIONS);
1089        assert_eq!(counter_b.get(), THREADS as u64 * ITERATIONS * 2);
1090        Ok(())
1091    }
1092
1093    #[test]
1094    fn repeated_counter_access() -> Result<()> {
1095        // test that repeated access to the same counter works correctly
1096        let counter = TlsCounter::new();
1097        for i in 1..=1000 {
1098            counter.add(1);
1099            assert_eq!(counter.get(), i);
1100        }
1101        Ok(())
1102    }
1103
1104    #[test]
1105    fn sharding_distributes_across_threads() -> Result<()> {
1106        // test that different threads get assigned to different shards
1107        // and that all increments are correctly counted
1108        let counter = std::sync::Arc::new(TlsCounter::new());
1109        const THREADS: usize = 16;
1110        const ITERATIONS: u64 = 100;
1111        let handles: Vec<_> = (0..THREADS)
1112            .map(|_| {
1113                let c = counter.clone();
1114                std::thread::spawn(move || {
1115                    for _ in 0..ITERATIONS {
1116                        c.inc();
1117                    }
1118                })
1119            })
1120            .collect();
1121        for h in handles {
1122            h.join().unwrap();
1123        }
1124        assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1125        Ok(())
1126    }
1127
1128    #[test]
1129    fn sharding_handles_more_threads_than_shards() -> Result<()> {
1130        // test that shard assignment wraps correctly when threads > NUM_SHARDS
1131        let counter = std::sync::Arc::new(TlsCounter::new());
1132        const THREADS: usize = 128; // 2x NUM_SHARDS to force wrap-around
1133        const ITERATIONS: u64 = 100;
1134        let handles: Vec<_> = (0..THREADS)
1135            .map(|_| {
1136                let c = counter.clone();
1137                std::thread::spawn(move || {
1138                    for _ in 0..ITERATIONS {
1139                        c.inc();
1140                    }
1141                })
1142            })
1143            .collect();
1144        for h in handles {
1145            h.join().unwrap();
1146        }
1147        assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1148        Ok(())
1149    }
1150
1151    #[test]
1152    fn counter_independence() -> Result<()> {
1153        // test that multiple counters are completely independent
1154        let counters: Vec<_> = (0..10).map(|_| TlsCounter::new()).collect();
1155        for (i, counter) in counters.iter().enumerate() {
1156            counter.add((i + 1) as u64 * 100);
1157        }
1158        for (i, counter) in counters.iter().enumerate() {
1159            assert_eq!(counter.get(), (i + 1) as u64 * 100);
1160        }
1161        Ok(())
1162    }
1163
1164    #[test]
1165    fn copy_printer_omits_hard_links() -> Result<()> {
1166        let progress = Progress::new();
1167        let mut printer = CopyProgressPrinter::new(&progress);
1168        let output = printer.print()?;
1169        assert!(output.contains("COPIED:"));
1170        assert!(output.contains("UNCHANGED:"));
1171        assert!(output.contains("REMOVED:"));
1172        assert!(output.contains("SKIPPED:"));
1173        assert!(!output.contains("hard-links"));
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn remove_printer_hides_copy_and_unchanged() -> Result<()> {
1179        let progress = Progress::new();
1180        let mut printer = RemoveProgressPrinter::new(&progress);
1181        let output = printer.print()?;
1182        assert!(output.contains("REMOVED:"));
1183        assert!(output.contains("SKIPPED:"));
1184        assert!(!output.contains("COPIED:"));
1185        assert!(!output.contains("UNCHANGED:"));
1186        assert!(!output.contains("hard-links"));
1187        assert!(!output.contains("specials:"));
1188        Ok(())
1189    }
1190
1191    #[test]
1192    fn link_printer_shows_hard_links() -> Result<()> {
1193        let progress = Progress::new();
1194        let mut printer = LinkProgressPrinter::new(&progress);
1195        let output = printer.print()?;
1196        assert!(output.contains("LINKED / COPIED:"));
1197        assert!(output.contains("UNCHANGED:"));
1198        assert!(output.contains("REMOVED:"));
1199        assert!(output.contains("SKIPPED:"));
1200        assert!(output.contains("hard-links"));
1201        Ok(())
1202    }
1203
1204    #[test]
1205    fn compare_printer_only_shows_ops() -> Result<()> {
1206        let progress = Progress::new();
1207        let mut printer = CompareProgressPrinter::new(&progress);
1208        let output = printer.print()?;
1209        assert!(output.contains("OPS:"));
1210        assert!(!output.contains("COPIED:"));
1211        assert!(!output.contains("UNCHANGED:"));
1212        assert!(!output.contains("REMOVED:"));
1213        assert!(!output.contains("SKIPPED:"));
1214        assert!(!output.contains("GENERATED:"));
1215        Ok(())
1216    }
1217
1218    #[test]
1219    fn filegen_printer_shows_generated() -> Result<()> {
1220        let progress = Progress::new();
1221        let mut printer = FilegenProgressPrinter::new(&progress);
1222        let output = printer.print()?;
1223        assert!(output.contains("OPS:"));
1224        assert!(output.contains("GENERATED:"));
1225        assert!(!output.contains("COPIED:"));
1226        assert!(!output.contains("UNCHANGED:"));
1227        assert!(!output.contains("REMOVED:"));
1228        assert!(!output.contains("SKIPPED:"));
1229        assert!(!output.contains("symlinks:"));
1230        Ok(())
1231    }
1232}