Skip to main content

common/
progress.rs

1use tracing::instrument;
2
3/// Number of shards for the counter. More shards reduce contention but increase memory.
4/// 64 shards × 128 bytes = 8KB per counter, which virtually eliminates contention.
5const NUM_SHARDS: usize = 64;
6
7/// Atomic counter padded to cache line size to prevent false sharing.
8/// Each shard lives on its own cache line so concurrent updates from different
9/// threads don't cause cache invalidation.
10/// Uses 128B alignment to support both x86-64 (64B) and ARM (128B) cache lines.
11#[repr(align(128))]
12struct PaddedAtomicU64(std::sync::atomic::AtomicU64);
13
14/// Global counter for assigning shard indices to threads.
15/// Each thread gets a unique index (mod NUM_SHARDS) on first access.
16static NEXT_SHARD_INDEX: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
17
18thread_local! {
19    /// Per-thread shard index, assigned once on first access.
20    /// Uses modulo to wrap around when more threads than shards.
21    static MY_SHARD: usize =
22        NEXT_SHARD_INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % NUM_SHARDS;
23}
24
25/// Sharded atomic counter optimized for concurrent access from multiple threads.
26///
27/// Uses cache-line-padded shards to prevent false sharing. Each thread is assigned
28/// a shard index, so updates from different threads typically hit different cache lines.
29///
30/// This design handles interleaved access to multiple counters efficiently - unlike
31/// a single-slot cache approach, there's no "cache thrashing" when alternating between
32/// counters.
33///
34/// # Memory
35///
36/// Each counter uses NUM_SHARDS × 128 bytes = 8KB (with 64 shards).
37/// This is larger than a simple AtomicU64 but virtually eliminates contention.
38pub struct TlsCounter {
39    shards: [PaddedAtomicU64; NUM_SHARDS],
40}
41
42impl TlsCounter {
43    #[must_use]
44    pub fn new() -> Self {
45        Self {
46            shards: std::array::from_fn(|_| PaddedAtomicU64(std::sync::atomic::AtomicU64::new(0))),
47        }
48    }
49
50    pub fn add(&self, value: u64) {
51        let shard = MY_SHARD.with(|&s| s);
52        self.shards[shard]
53            .0
54            .fetch_add(value, std::sync::atomic::Ordering::Relaxed);
55    }
56
57    pub fn inc(&self) {
58        self.add(1);
59    }
60
61    pub fn get(&self) -> u64 {
62        self.shards
63            .iter()
64            .map(|s| s.0.load(std::sync::atomic::Ordering::Relaxed))
65            .sum()
66    }
67}
68
69impl std::fmt::Debug for TlsCounter {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("TlsCounter")
72            .field("value", &self.get())
73            .finish()
74    }
75}
76
77impl Default for TlsCounter {
78    fn default() -> Self {
79        Self::new()
80    }
81}
82
83#[derive(Debug)]
84pub struct ProgressCounter {
85    started: TlsCounter,
86    finished: TlsCounter,
87}
88
89impl Default for ProgressCounter {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95pub struct ProgressGuard<'a> {
96    progress: &'a ProgressCounter,
97}
98
99impl<'a> ProgressGuard<'a> {
100    pub fn new(progress: &'a ProgressCounter) -> Self {
101        progress.started.inc();
102        Self { progress }
103    }
104}
105
106impl Drop for ProgressGuard<'_> {
107    fn drop(&mut self) {
108        self.progress.finished.inc();
109    }
110}
111
112pub struct Status {
113    pub started: u64,
114    pub finished: u64,
115}
116
117impl ProgressCounter {
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            started: TlsCounter::new(),
122            finished: TlsCounter::new(),
123        }
124    }
125
126    pub fn guard(&self) -> ProgressGuard<'_> {
127        ProgressGuard::new(self)
128    }
129
130    #[instrument]
131    pub fn get(&self) -> Status {
132        let mut status = Status {
133            started: self.started.get(),
134            finished: self.finished.get(),
135        };
136        if status.finished > status.started {
137            tracing::debug!(
138                "Progress inversion - started: {}, finished {}",
139                status.started,
140                status.finished
141            );
142            status.started = status.finished;
143        }
144        status
145    }
146}
147
148pub struct Progress {
149    pub ops: ProgressCounter,
150    pub bytes_copied: TlsCounter,
151    pub hard_links_created: TlsCounter,
152    pub files_copied: TlsCounter,
153    pub symlinks_created: TlsCounter,
154    pub directories_created: TlsCounter,
155    pub files_changed: TlsCounter,
156    pub symlinks_changed: TlsCounter,
157    pub directories_changed: TlsCounter,
158    pub files_unchanged: TlsCounter,
159    pub symlinks_unchanged: TlsCounter,
160    pub directories_unchanged: TlsCounter,
161    pub hard_links_unchanged: TlsCounter,
162    pub files_removed: TlsCounter,
163    pub symlinks_removed: TlsCounter,
164    pub directories_removed: TlsCounter,
165    pub bytes_removed: TlsCounter,
166    pub files_skipped: TlsCounter,
167    pub symlinks_skipped: TlsCounter,
168    pub directories_skipped: TlsCounter,
169    pub specials_skipped: TlsCounter,
170    start_time: std::time::Instant,
171}
172
173impl Progress {
174    #[must_use]
175    pub fn new() -> Self {
176        Self {
177            ops: Default::default(),
178            bytes_copied: Default::default(),
179            hard_links_created: Default::default(),
180            files_copied: Default::default(),
181            symlinks_created: Default::default(),
182            directories_created: Default::default(),
183            files_changed: Default::default(),
184            symlinks_changed: Default::default(),
185            directories_changed: Default::default(),
186            files_unchanged: Default::default(),
187            symlinks_unchanged: Default::default(),
188            directories_unchanged: Default::default(),
189            hard_links_unchanged: Default::default(),
190            files_removed: Default::default(),
191            symlinks_removed: Default::default(),
192            directories_removed: Default::default(),
193            bytes_removed: Default::default(),
194            files_skipped: Default::default(),
195            symlinks_skipped: Default::default(),
196            directories_skipped: Default::default(),
197            specials_skipped: Default::default(),
198            start_time: std::time::Instant::now(),
199        }
200    }
201
202    pub fn get_duration(&self) -> std::time::Duration {
203        self.start_time.elapsed()
204    }
205}
206
207impl Default for Progress {
208    fn default() -> Self {
209        Self::new()
210    }
211}
212
213#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
214pub struct SerializableProgress {
215    pub ops_started: u64,
216    pub ops_finished: u64,
217    pub bytes_copied: u64,
218    pub hard_links_created: u64,
219    pub files_copied: u64,
220    pub symlinks_created: u64,
221    pub directories_created: u64,
222    pub files_unchanged: u64,
223    pub symlinks_unchanged: u64,
224    pub directories_unchanged: u64,
225    pub hard_links_unchanged: u64,
226    pub files_removed: u64,
227    pub symlinks_removed: u64,
228    pub directories_removed: u64,
229    pub bytes_removed: u64,
230    pub files_skipped: u64,
231    pub symlinks_skipped: u64,
232    pub directories_skipped: u64,
233    pub specials_skipped: u64,
234    pub current_time: std::time::SystemTime,
235}
236
237impl Default for SerializableProgress {
238    fn default() -> Self {
239        Self {
240            ops_started: 0,
241            ops_finished: 0,
242            bytes_copied: 0,
243            hard_links_created: 0,
244            files_copied: 0,
245            symlinks_created: 0,
246            directories_created: 0,
247            files_unchanged: 0,
248            symlinks_unchanged: 0,
249            directories_unchanged: 0,
250            hard_links_unchanged: 0,
251            files_removed: 0,
252            symlinks_removed: 0,
253            directories_removed: 0,
254            bytes_removed: 0,
255            files_skipped: 0,
256            symlinks_skipped: 0,
257            directories_skipped: 0,
258            specials_skipped: 0,
259            current_time: std::time::SystemTime::now(),
260        }
261    }
262}
263
264impl From<&Progress> for SerializableProgress {
265    /// Creates a `SerializableProgress` from a Progress, capturing the current time at the moment of conversion
266    fn from(progress: &Progress) -> Self {
267        Self {
268            ops_started: progress.ops.started.get(),
269            ops_finished: progress.ops.finished.get(),
270            bytes_copied: progress.bytes_copied.get(),
271            hard_links_created: progress.hard_links_created.get(),
272            files_copied: progress.files_copied.get(),
273            symlinks_created: progress.symlinks_created.get(),
274            directories_created: progress.directories_created.get(),
275            files_unchanged: progress.files_unchanged.get(),
276            symlinks_unchanged: progress.symlinks_unchanged.get(),
277            directories_unchanged: progress.directories_unchanged.get(),
278            hard_links_unchanged: progress.hard_links_unchanged.get(),
279            files_removed: progress.files_removed.get(),
280            symlinks_removed: progress.symlinks_removed.get(),
281            directories_removed: progress.directories_removed.get(),
282            bytes_removed: progress.bytes_removed.get(),
283            files_skipped: progress.files_skipped.get(),
284            symlinks_skipped: progress.symlinks_skipped.get(),
285            directories_skipped: progress.directories_skipped.get(),
286            specials_skipped: progress.specials_skipped.get(),
287            current_time: std::time::SystemTime::now(),
288        }
289    }
290}
291
292/// Trait implemented by per-tool progress printers.
293///
294/// Each tool has its own printer that only renders sections relevant to what
295/// the tool actually does (e.g. `rrm` has no COPIED section; `rcmp` only shows
296/// OPS). Prefer adding a new specialized printer over extending a shared one.
297pub trait LocalProgressReport: Send {
298    fn print(&mut self) -> anyhow::Result<String>;
299}
300
301/// Selects which tool-specific printer to instantiate for a local progress run.
302#[derive(Copy, Clone, Debug)]
303pub enum LocalProgressKind {
304    Copy,
305    Remove,
306    Link,
307    Compare,
308    Filegen,
309    Modify,
310}
311
312fn ops_rates(
313    progress: &Progress,
314    last_ops: u64,
315    last_update: std::time::Instant,
316    now: std::time::Instant,
317) -> (Status, f64, f64) {
318    let ops = progress.ops.get();
319    let total_secs = progress.get_duration().as_secs_f64();
320    let curr_secs = (now - last_update).as_secs_f64();
321    let average = if total_secs > 0.0 {
322        ops.finished as f64 / total_secs
323    } else {
324        0.0
325    };
326    let current = if curr_secs > 0.0 {
327        (ops.finished - last_ops) as f64 / curr_secs
328    } else {
329        0.0
330    };
331    (ops, average, current)
332}
333
334fn bytes_rates(
335    total: u64,
336    last: u64,
337    total_secs: f64,
338    curr_secs: f64,
339) -> (bytesize::ByteSize, bytesize::ByteSize) {
340    let average = if total_secs > 0.0 {
341        total as f64 / total_secs
342    } else {
343        0.0
344    };
345    let current = if curr_secs > 0.0 {
346        (total - last) as f64 / curr_secs
347    } else {
348        0.0
349    };
350    (
351        bytesize::ByteSize(average as u64),
352        bytesize::ByteSize(current as u64),
353    )
354}
355
356/// Progress printer for `rcp` (local copy). Shows COPIED/UNCHANGED/REMOVED/SKIPPED
357/// for files, symlinks, and directories. Hard-link counters are omitted because
358/// `rcp` never creates hard links.
359pub struct CopyProgressPrinter<'a> {
360    progress: &'a Progress,
361    last_ops: u64,
362    last_bytes: u64,
363    last_update: std::time::Instant,
364}
365
366impl<'a> CopyProgressPrinter<'a> {
367    pub fn new(progress: &'a Progress) -> Self {
368        Self {
369            progress,
370            last_ops: progress.ops.get().finished,
371            last_bytes: progress.bytes_copied.get(),
372            last_update: std::time::Instant::now(),
373        }
374    }
375}
376
377impl<'a> LocalProgressReport for CopyProgressPrinter<'a> {
378    fn print(&mut self) -> anyhow::Result<String> {
379        let now = std::time::Instant::now();
380        let (ops, avg_ops, cur_ops) =
381            ops_rates(self.progress, self.last_ops, self.last_update, now);
382        let bytes = self.progress.bytes_copied.get();
383        let total_secs = self.progress.get_duration().as_secs_f64();
384        let curr_secs = (now - self.last_update).as_secs_f64();
385        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
386        self.last_ops = ops.finished;
387        self.last_bytes = bytes;
388        self.last_update = now;
389        Ok(format!(
390            "---------------------\n\
391            OPS:\n\
392            pending: {:>10}\n\
393            average: {:>10.2} items/s\n\
394            current: {:>10.2} items/s\n\
395            -----------------------\n\
396            COPIED:\n\
397            average: {:>10}/s\n\
398            current: {:>10}/s\n\
399            bytes:   {:>10}\n\
400            files:       {:>10}\n\
401            symlinks:    {:>10}\n\
402            directories: {:>10}\n\
403            -----------------------\n\
404            UNCHANGED:\n\
405            files:       {:>10}\n\
406            symlinks:    {:>10}\n\
407            directories: {:>10}\n\
408            -----------------------\n\
409            REMOVED:\n\
410            bytes:       {:>10}\n\
411            files:       {:>10}\n\
412            symlinks:    {:>10}\n\
413            directories: {:>10}\n\
414            -----------------------\n\
415            SKIPPED:\n\
416            files:       {:>10}\n\
417            symlinks:    {:>10}\n\
418            directories: {:>10}\n\
419            specials:    {:>10}",
420            ops.started - ops.finished,
421            avg_ops,
422            cur_ops,
423            avg_bytes,
424            cur_bytes,
425            bytesize::ByteSize(bytes),
426            self.progress.files_copied.get(),
427            self.progress.symlinks_created.get(),
428            self.progress.directories_created.get(),
429            self.progress.files_unchanged.get(),
430            self.progress.symlinks_unchanged.get(),
431            self.progress.directories_unchanged.get(),
432            bytesize::ByteSize(self.progress.bytes_removed.get()),
433            self.progress.files_removed.get(),
434            self.progress.symlinks_removed.get(),
435            self.progress.directories_removed.get(),
436            self.progress.files_skipped.get(),
437            self.progress.symlinks_skipped.get(),
438            self.progress.directories_skipped.get(),
439            self.progress.specials_skipped.get(),
440        ))
441    }
442}
443
444/// Progress printer for `rrm`. Only shows REMOVED and SKIPPED — `rrm` never
445/// creates, copies, or leaves entries unchanged.
446pub struct RemoveProgressPrinter<'a> {
447    progress: &'a Progress,
448    last_ops: u64,
449    last_bytes_removed: u64,
450    last_update: std::time::Instant,
451}
452
453impl<'a> RemoveProgressPrinter<'a> {
454    pub fn new(progress: &'a Progress) -> Self {
455        Self {
456            progress,
457            last_ops: progress.ops.get().finished,
458            last_bytes_removed: progress.bytes_removed.get(),
459            last_update: std::time::Instant::now(),
460        }
461    }
462}
463
464impl<'a> LocalProgressReport for RemoveProgressPrinter<'a> {
465    fn print(&mut self) -> anyhow::Result<String> {
466        let now = std::time::Instant::now();
467        let (ops, avg_ops, cur_ops) =
468            ops_rates(self.progress, self.last_ops, self.last_update, now);
469        let bytes_removed = self.progress.bytes_removed.get();
470        let total_secs = self.progress.get_duration().as_secs_f64();
471        let curr_secs = (now - self.last_update).as_secs_f64();
472        let (avg_bytes, cur_bytes) = bytes_rates(
473            bytes_removed,
474            self.last_bytes_removed,
475            total_secs,
476            curr_secs,
477        );
478        self.last_ops = ops.finished;
479        self.last_bytes_removed = bytes_removed;
480        self.last_update = now;
481        Ok(format!(
482            "---------------------\n\
483            OPS:\n\
484            pending: {:>10}\n\
485            average: {:>10.2} items/s\n\
486            current: {:>10.2} items/s\n\
487            -----------------------\n\
488            REMOVED:\n\
489            average: {:>10}/s\n\
490            current: {:>10}/s\n\
491            bytes:       {:>10}\n\
492            files:       {:>10}\n\
493            symlinks:    {:>10}\n\
494            directories: {:>10}\n\
495            -----------------------\n\
496            SKIPPED:\n\
497            files:       {:>10}\n\
498            symlinks:    {:>10}\n\
499            directories: {:>10}",
500            ops.started - ops.finished,
501            avg_ops,
502            cur_ops,
503            avg_bytes,
504            cur_bytes,
505            bytesize::ByteSize(bytes_removed),
506            self.progress.files_removed.get(),
507            self.progress.symlinks_removed.get(),
508            self.progress.directories_removed.get(),
509            self.progress.files_skipped.get(),
510            self.progress.symlinks_skipped.get(),
511            self.progress.directories_skipped.get(),
512        ))
513    }
514}
515
516/// Progress printer for `rchm`. Reports CHANGED / UNCHANGED / SKIPPED counts per
517/// entry type; rchm performs only metadata ops, so there are no byte counters.
518pub struct ModifyProgressPrinter<'a> {
519    progress: &'a Progress,
520    last_ops: u64,
521    last_update: std::time::Instant,
522}
523
524impl<'a> ModifyProgressPrinter<'a> {
525    pub fn new(progress: &'a Progress) -> Self {
526        Self {
527            progress,
528            last_ops: progress.ops.get().finished,
529            last_update: std::time::Instant::now(),
530        }
531    }
532}
533
534impl<'a> LocalProgressReport for ModifyProgressPrinter<'a> {
535    fn print(&mut self) -> anyhow::Result<String> {
536        let now = std::time::Instant::now();
537        let (ops, avg_ops, cur_ops) =
538            ops_rates(self.progress, self.last_ops, self.last_update, now);
539        self.last_ops = ops.finished;
540        self.last_update = now;
541        Ok(format!(
542            "---------------------\n\
543            OPS:\n\
544            pending: {:>10}\n\
545            average: {:>10.2} items/s\n\
546            current: {:>10.2} items/s\n\
547            -----------------------\n\
548            CHANGED:\n\
549            files:       {:>10}\n\
550            symlinks:    {:>10}\n\
551            directories: {:>10}\n\
552            -----------------------\n\
553            UNCHANGED:\n\
554            files:       {:>10}\n\
555            symlinks:    {:>10}\n\
556            directories: {:>10}\n\
557            -----------------------\n\
558            SKIPPED:\n\
559            files:       {:>10}\n\
560            symlinks:    {:>10}\n\
561            directories: {:>10}",
562            ops.started - ops.finished,
563            avg_ops,
564            cur_ops,
565            self.progress.files_changed.get(),
566            self.progress.symlinks_changed.get(),
567            self.progress.directories_changed.get(),
568            self.progress.files_unchanged.get(),
569            self.progress.symlinks_unchanged.get(),
570            self.progress.directories_unchanged.get(),
571            self.progress.files_skipped.get(),
572            self.progress.symlinks_skipped.get(),
573            self.progress.directories_skipped.get(),
574        ))
575    }
576}
577
578/// Progress printer for `rlink`. Like [`CopyProgressPrinter`] but also reports
579/// hard-link counters, since `rlink`'s primary output is hard links (with copies
580/// as a fallback for `--update` changes).
581pub struct LinkProgressPrinter<'a> {
582    progress: &'a Progress,
583    last_ops: u64,
584    last_bytes: u64,
585    last_update: std::time::Instant,
586}
587
588impl<'a> LinkProgressPrinter<'a> {
589    pub fn new(progress: &'a Progress) -> Self {
590        Self {
591            progress,
592            last_ops: progress.ops.get().finished,
593            last_bytes: progress.bytes_copied.get(),
594            last_update: std::time::Instant::now(),
595        }
596    }
597}
598
599impl<'a> LocalProgressReport for LinkProgressPrinter<'a> {
600    fn print(&mut self) -> anyhow::Result<String> {
601        let now = std::time::Instant::now();
602        let (ops, avg_ops, cur_ops) =
603            ops_rates(self.progress, self.last_ops, self.last_update, now);
604        let bytes = self.progress.bytes_copied.get();
605        let total_secs = self.progress.get_duration().as_secs_f64();
606        let curr_secs = (now - self.last_update).as_secs_f64();
607        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
608        self.last_ops = ops.finished;
609        self.last_bytes = bytes;
610        self.last_update = now;
611        Ok(format!(
612            "---------------------\n\
613            OPS:\n\
614            pending: {:>10}\n\
615            average: {:>10.2} items/s\n\
616            current: {:>10.2} items/s\n\
617            -----------------------\n\
618            LINKED / COPIED:\n\
619            average: {:>10}/s\n\
620            current: {:>10}/s\n\
621            bytes:   {:>10}\n\
622            hard-links:  {:>10}\n\
623            files:       {:>10}\n\
624            symlinks:    {:>10}\n\
625            directories: {:>10}\n\
626            -----------------------\n\
627            UNCHANGED:\n\
628            hard-links:  {:>10}\n\
629            files:       {:>10}\n\
630            symlinks:    {:>10}\n\
631            directories: {:>10}\n\
632            -----------------------\n\
633            REMOVED:\n\
634            bytes:       {:>10}\n\
635            files:       {:>10}\n\
636            symlinks:    {:>10}\n\
637            directories: {:>10}\n\
638            -----------------------\n\
639            SKIPPED:\n\
640            files:       {:>10}\n\
641            symlinks:    {:>10}\n\
642            directories: {:>10}\n\
643            specials:    {:>10}",
644            ops.started - ops.finished,
645            avg_ops,
646            cur_ops,
647            avg_bytes,
648            cur_bytes,
649            bytesize::ByteSize(bytes),
650            self.progress.hard_links_created.get(),
651            self.progress.files_copied.get(),
652            self.progress.symlinks_created.get(),
653            self.progress.directories_created.get(),
654            self.progress.hard_links_unchanged.get(),
655            self.progress.files_unchanged.get(),
656            self.progress.symlinks_unchanged.get(),
657            self.progress.directories_unchanged.get(),
658            bytesize::ByteSize(self.progress.bytes_removed.get()),
659            self.progress.files_removed.get(),
660            self.progress.symlinks_removed.get(),
661            self.progress.directories_removed.get(),
662            self.progress.files_skipped.get(),
663            self.progress.symlinks_skipped.get(),
664            self.progress.directories_skipped.get(),
665            self.progress.specials_skipped.get(),
666        ))
667    }
668}
669
670/// Progress printer for `rcmp`. Compare operations only drive the ops counter —
671/// they don't copy, remove, or modify anything — so only OPS is shown.
672pub struct CompareProgressPrinter<'a> {
673    progress: &'a Progress,
674    last_ops: u64,
675    last_update: std::time::Instant,
676}
677
678impl<'a> CompareProgressPrinter<'a> {
679    pub fn new(progress: &'a Progress) -> Self {
680        Self {
681            progress,
682            last_ops: progress.ops.get().finished,
683            last_update: std::time::Instant::now(),
684        }
685    }
686}
687
688impl<'a> LocalProgressReport for CompareProgressPrinter<'a> {
689    fn print(&mut self) -> anyhow::Result<String> {
690        let now = std::time::Instant::now();
691        let (ops, avg_ops, cur_ops) =
692            ops_rates(self.progress, self.last_ops, self.last_update, now);
693        self.last_ops = ops.finished;
694        self.last_update = now;
695        Ok(format!(
696            "---------------------\n\
697            OPS:\n\
698            pending: {:>10}\n\
699            compared: {:>9}\n\
700            average: {:>10.2} items/s\n\
701            current: {:>10.2} items/s",
702            ops.started - ops.finished,
703            ops.finished,
704            avg_ops,
705            cur_ops,
706        ))
707    }
708}
709
710/// Progress printer for `filegen`. Shows a GENERATED section instead of COPIED,
711/// because `filegen` creates files and directories rather than copying them.
712pub struct FilegenProgressPrinter<'a> {
713    progress: &'a Progress,
714    last_ops: u64,
715    last_bytes: u64,
716    last_update: std::time::Instant,
717}
718
719impl<'a> FilegenProgressPrinter<'a> {
720    pub fn new(progress: &'a Progress) -> Self {
721        Self {
722            progress,
723            last_ops: progress.ops.get().finished,
724            last_bytes: progress.bytes_copied.get(),
725            last_update: std::time::Instant::now(),
726        }
727    }
728}
729
730impl<'a> LocalProgressReport for FilegenProgressPrinter<'a> {
731    fn print(&mut self) -> anyhow::Result<String> {
732        let now = std::time::Instant::now();
733        let (ops, avg_ops, cur_ops) =
734            ops_rates(self.progress, self.last_ops, self.last_update, now);
735        let bytes = self.progress.bytes_copied.get();
736        let total_secs = self.progress.get_duration().as_secs_f64();
737        let curr_secs = (now - self.last_update).as_secs_f64();
738        let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
739        self.last_ops = ops.finished;
740        self.last_bytes = bytes;
741        self.last_update = now;
742        Ok(format!(
743            "---------------------\n\
744            OPS:\n\
745            pending: {:>10}\n\
746            average: {:>10.2} items/s\n\
747            current: {:>10.2} items/s\n\
748            -----------------------\n\
749            GENERATED:\n\
750            average: {:>10}/s\n\
751            current: {:>10}/s\n\
752            bytes:       {:>10}\n\
753            files:       {:>10}\n\
754            directories: {:>10}",
755            ops.started - ops.finished,
756            avg_ops,
757            cur_ops,
758            avg_bytes,
759            cur_bytes,
760            bytesize::ByteSize(bytes),
761            self.progress.files_copied.get(),
762            self.progress.directories_created.get(),
763        ))
764    }
765}
766
767/// Construct the tool-specific printer for the given [`LocalProgressKind`],
768/// borrowing the global [`Progress`] for its lifetime.
769#[must_use]
770pub fn make_local_printer<'a>(
771    kind: LocalProgressKind,
772    progress: &'a Progress,
773) -> Box<dyn LocalProgressReport + 'a> {
774    match kind {
775        LocalProgressKind::Copy => Box::new(CopyProgressPrinter::new(progress)),
776        LocalProgressKind::Remove => Box::new(RemoveProgressPrinter::new(progress)),
777        LocalProgressKind::Link => Box::new(LinkProgressPrinter::new(progress)),
778        LocalProgressKind::Compare => Box::new(CompareProgressPrinter::new(progress)),
779        LocalProgressKind::Filegen => Box::new(FilegenProgressPrinter::new(progress)),
780        LocalProgressKind::Modify => Box::new(ModifyProgressPrinter::new(progress)),
781    }
782}
783
784pub struct RcpdProgressPrinter {
785    start_time: std::time::Instant,
786    last_source_ops: u64,
787    last_source_bytes: u64,
788    last_source_files: u64,
789    last_dest_ops: u64,
790    last_dest_bytes: u64,
791    last_update: std::time::Instant,
792}
793
794impl RcpdProgressPrinter {
795    #[must_use]
796    pub fn new() -> Self {
797        let now = std::time::Instant::now();
798        Self {
799            start_time: now,
800            last_source_ops: 0,
801            last_source_bytes: 0,
802            last_source_files: 0,
803            last_dest_ops: 0,
804            last_dest_bytes: 0,
805            last_update: now,
806        }
807    }
808
809    fn calculate_current_rate(&self, current: u64, last: u64, duration_secs: f64) -> f64 {
810        if duration_secs > 0.0 {
811            (current - last) as f64 / duration_secs
812        } else {
813            0.0
814        }
815    }
816
817    fn calculate_average_rate(&self, total: u64, total_duration_secs: f64) -> f64 {
818        if total_duration_secs > 0.0 {
819            total as f64 / total_duration_secs
820        } else {
821            0.0
822        }
823    }
824
825    pub fn print(
826        &mut self,
827        source_progress: &SerializableProgress,
828        dest_progress: &SerializableProgress,
829    ) -> anyhow::Result<String> {
830        let time_now = std::time::Instant::now();
831        let total_duration_secs = (time_now - self.start_time).as_secs_f64();
832        let curr_duration_secs = (time_now - self.last_update).as_secs_f64();
833        // source current rates
834        let source_ops_rate_curr = self.calculate_current_rate(
835            source_progress.ops_finished,
836            self.last_source_ops,
837            curr_duration_secs,
838        );
839        let source_bytes_rate_curr = self.calculate_current_rate(
840            source_progress.bytes_copied,
841            self.last_source_bytes,
842            curr_duration_secs,
843        );
844        let source_files_rate_curr = self.calculate_current_rate(
845            source_progress.files_copied,
846            self.last_source_files,
847            curr_duration_secs,
848        );
849        // source average rates
850        let source_ops_rate_avg =
851            self.calculate_average_rate(source_progress.ops_finished, total_duration_secs);
852        let source_bytes_rate_avg =
853            self.calculate_average_rate(source_progress.bytes_copied, total_duration_secs);
854        let source_files_rate_avg =
855            self.calculate_average_rate(source_progress.files_copied, total_duration_secs);
856        // destination current rates
857        let dest_ops_rate_curr = self.calculate_current_rate(
858            dest_progress.ops_finished,
859            self.last_dest_ops,
860            curr_duration_secs,
861        );
862        let dest_bytes_rate_curr = self.calculate_current_rate(
863            dest_progress.bytes_copied,
864            self.last_dest_bytes,
865            curr_duration_secs,
866        );
867        // destination average rates
868        let dest_ops_rate_avg =
869            self.calculate_average_rate(dest_progress.ops_finished, total_duration_secs);
870        let dest_bytes_rate_avg =
871            self.calculate_average_rate(dest_progress.bytes_copied, total_duration_secs);
872        // update last values
873        self.last_source_ops = source_progress.ops_finished;
874        self.last_source_bytes = source_progress.bytes_copied;
875        self.last_source_files = source_progress.files_copied;
876        self.last_dest_ops = dest_progress.ops_finished;
877        self.last_dest_bytes = dest_progress.bytes_copied;
878        self.last_update = time_now;
879        Ok(format!(
880            "==== SOURCE =======\n\
881            OPS:\n\
882            pending: {:>10}\n\
883            average: {:>10.2} items/s\n\
884            current: {:>10.2} items/s\n\
885            ---------------------\n\
886            COPIED:\n\
887            average: {:>10}/s\n\
888            current: {:>10}/s\n\
889            bytes:   {:>10}\n\
890            files:       {:>10}\n\
891            ---------------------\n\
892            FILES:\n\
893            average: {:>10.2} files/s\n\
894            current: {:>10.2} files/s\n\
895            ---------------------\n\
896            SKIPPED:\n\
897            files:       {:>10}\n\
898            symlinks:    {:>10}\n\
899            directories: {:>10}\n\
900            specials:    {:>10}\n\
901            ==== DESTINATION ====\n\
902            OPS:\n\
903            pending: {:>10}\n\
904            average: {:>10.2} items/s\n\
905            current: {:>10.2} items/s\n\
906            ---------------------\n\
907            COPIED:\n\
908            average: {:>10}/s\n\
909            current: {:>10}/s\n\
910            bytes:   {:>10}\n\
911            files:       {:>10}\n\
912            symlinks:    {:>10}\n\
913            directories: {:>10}\n\
914            hard-links:  {:>10}\n\
915            ---------------------\n\
916            UNCHANGED:\n\
917            files:       {:>10}\n\
918            symlinks:    {:>10}\n\
919            directories: {:>10}\n\
920            hard-links:  {:>10}\n\
921            ---------------------\n\
922            REMOVED:\n\
923            bytes:       {:>10}\n\
924            files:       {:>10}\n\
925            symlinks:    {:>10}\n\
926            directories: {:>10}",
927            // source section
928            source_progress.ops_started - source_progress.ops_finished, // pending
929            source_ops_rate_avg,
930            source_ops_rate_curr,
931            bytesize::ByteSize(source_bytes_rate_avg as u64),
932            bytesize::ByteSize(source_bytes_rate_curr as u64),
933            bytesize::ByteSize(source_progress.bytes_copied),
934            source_progress.files_copied,
935            source_files_rate_avg,
936            source_files_rate_curr,
937            // source skipped
938            source_progress.files_skipped,
939            source_progress.symlinks_skipped,
940            source_progress.directories_skipped,
941            source_progress.specials_skipped,
942            // destination section
943            dest_progress.ops_started - dest_progress.ops_finished, // pending
944            dest_ops_rate_avg,
945            dest_ops_rate_curr,
946            bytesize::ByteSize(dest_bytes_rate_avg as u64),
947            bytesize::ByteSize(dest_bytes_rate_curr as u64),
948            bytesize::ByteSize(dest_progress.bytes_copied),
949            // destination detailed stats
950            dest_progress.files_copied,
951            dest_progress.symlinks_created,
952            dest_progress.directories_created,
953            dest_progress.hard_links_created,
954            // unchanged
955            dest_progress.files_unchanged,
956            dest_progress.symlinks_unchanged,
957            dest_progress.directories_unchanged,
958            dest_progress.hard_links_unchanged,
959            // removed
960            bytesize::ByteSize(dest_progress.bytes_removed),
961            dest_progress.files_removed,
962            dest_progress.symlinks_removed,
963            dest_progress.directories_removed,
964        ))
965    }
966}
967
968impl Default for RcpdProgressPrinter {
969    fn default() -> Self {
970        Self::new()
971    }
972}
973
974#[cfg(test)]
975mod tests {
976    use super::*;
977    use crate::remote_tracing::TracingMessage;
978    use anyhow::Result;
979
980    #[test]
981    fn basic_counting() -> Result<()> {
982        let tls_counter = TlsCounter::new();
983        for _ in 0..10 {
984            tls_counter.inc();
985        }
986        assert!(tls_counter.get() == 10);
987        Ok(())
988    }
989
990    #[test]
991    fn threaded_counting() -> Result<()> {
992        let tls_counter = TlsCounter::new();
993        std::thread::scope(|scope| {
994            let mut handles = Vec::new();
995            for _ in 0..10 {
996                handles.push(scope.spawn(|| {
997                    for _ in 0..100 {
998                        tls_counter.inc();
999                    }
1000                }));
1001            }
1002        });
1003        assert!(tls_counter.get() == 1000);
1004        Ok(())
1005    }
1006
1007    #[test]
1008    fn basic_guard() -> Result<()> {
1009        let tls_progress = ProgressCounter::new();
1010        let _guard = tls_progress.guard();
1011        Ok(())
1012    }
1013
1014    #[test]
1015    fn test_serializable_progress() -> Result<()> {
1016        let progress = Progress::new();
1017
1018        // Add some test data
1019        progress.files_copied.inc();
1020        progress.bytes_copied.add(1024);
1021        progress.directories_created.add(2);
1022
1023        // Test conversion to serializable format
1024        let serializable = SerializableProgress::from(&progress);
1025        assert_eq!(serializable.files_copied, 1);
1026        assert_eq!(serializable.bytes_copied, 1024);
1027        assert_eq!(serializable.directories_created, 2);
1028
1029        // Test that we can create a TracingMessage with progress
1030        let _tracing_msg = TracingMessage::Progress(serializable);
1031
1032        Ok(())
1033    }
1034
1035    #[test]
1036    fn test_rcpd_progress_printer() -> Result<()> {
1037        let mut printer = RcpdProgressPrinter::new();
1038
1039        // Create test progress data
1040        let source_progress = SerializableProgress {
1041            ops_started: 100,
1042            ops_finished: 80,
1043            bytes_copied: 1024,
1044            files_copied: 5,
1045            files_skipped: 3,
1046            symlinks_skipped: 1,
1047            directories_skipped: 2,
1048            ..Default::default()
1049        };
1050
1051        let dest_progress = SerializableProgress {
1052            ops_started: 80,
1053            ops_finished: 70,
1054            bytes_copied: 1024,
1055            files_copied: 8,
1056            symlinks_created: 2,
1057            directories_created: 1,
1058            ..Default::default()
1059        };
1060
1061        // Test that print returns a formatted string
1062        let output = printer.print(&source_progress, &dest_progress)?;
1063        assert!(output.contains("SOURCE"));
1064        assert!(output.contains("DESTINATION"));
1065        assert!(output.contains("OPS:"));
1066        assert!(output.contains("pending:"));
1067        assert!(output.contains("20")); // source pending ops (100-80)
1068        assert!(output.contains("10")); // dest pending ops (80-70)
1069        let mut sections = output.split("==== DESTINATION ====");
1070        let source_section = sections.next().unwrap();
1071        let dest_section = sections.next().unwrap_or("");
1072        let source_files_line = source_section
1073            .lines()
1074            .find(|line| line.trim_start().starts_with("files:"))
1075            .expect("source files line missing");
1076        assert!(source_files_line.trim_start().ends_with("5"));
1077        assert!(!source_files_line.contains('.'));
1078        let dest_files_line = dest_section
1079            .lines()
1080            .find(|line| line.trim_start().starts_with("files:"))
1081            .expect("dest files line missing");
1082        assert!(dest_files_line.trim_start().ends_with("8"));
1083        assert!(!dest_files_line.contains('.'));
1084        // verify SKIPPED section appears in source
1085        assert!(source_section.contains("SKIPPED:"));
1086        let skipped_section = source_section
1087            .split("SKIPPED:")
1088            .nth(1)
1089            .expect("SKIPPED section missing in source");
1090        let skipped_lines: Vec<&str> = skipped_section.lines().collect();
1091        let skipped_files_line = skipped_lines
1092            .iter()
1093            .find(|line| line.trim_start().starts_with("files:"))
1094            .expect("skipped files line missing");
1095        assert!(skipped_files_line.trim_start().ends_with("3"));
1096        let skipped_symlinks_line = skipped_lines
1097            .iter()
1098            .find(|line| line.trim_start().starts_with("symlinks:"))
1099            .expect("skipped symlinks line missing");
1100        assert!(skipped_symlinks_line.trim_start().ends_with("1"));
1101        let skipped_dirs_line = skipped_lines
1102            .iter()
1103            .find(|line| line.trim_start().starts_with("directories:"))
1104            .expect("skipped directories line missing");
1105        assert!(skipped_dirs_line.trim_start().ends_with("2"));
1106
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn interleaved_counter_access() -> Result<()> {
1112        // test that interleaved access to multiple counters works correctly
1113        // (this was problematic with the old single-slot cache design)
1114        let counter_a = TlsCounter::new();
1115        let counter_b = TlsCounter::new();
1116        let counter_c = TlsCounter::new();
1117        for i in 0..100 {
1118            counter_a.add(1);
1119            counter_b.add(2);
1120            counter_c.add(3);
1121            // verify intermediate values are correct
1122            if i % 10 == 0 {
1123                assert_eq!(counter_a.get(), i + 1);
1124                assert_eq!(counter_b.get(), (i + 1) * 2);
1125                assert_eq!(counter_c.get(), (i + 1) * 3);
1126            }
1127        }
1128        // verify final counts
1129        assert_eq!(counter_a.get(), 100);
1130        assert_eq!(counter_b.get(), 200);
1131        assert_eq!(counter_c.get(), 300);
1132        Ok(())
1133    }
1134
1135    #[test]
1136    fn concurrent_multi_counter_access() -> Result<()> {
1137        // test concurrent access with multiple threads each using multiple counters
1138        let counter_a = std::sync::Arc::new(TlsCounter::new());
1139        let counter_b = std::sync::Arc::new(TlsCounter::new());
1140        const THREADS: usize = 4;
1141        const ITERATIONS: u64 = 1000;
1142        let handles: Vec<_> = (0..THREADS)
1143            .map(|_| {
1144                let ca = counter_a.clone();
1145                let cb = counter_b.clone();
1146                std::thread::spawn(move || {
1147                    for _ in 0..ITERATIONS {
1148                        ca.add(1);
1149                        cb.add(2);
1150                    }
1151                })
1152            })
1153            .collect();
1154        for h in handles {
1155            h.join().unwrap();
1156        }
1157        // verify totals are correct (no lost increments)
1158        assert_eq!(counter_a.get(), THREADS as u64 * ITERATIONS);
1159        assert_eq!(counter_b.get(), THREADS as u64 * ITERATIONS * 2);
1160        Ok(())
1161    }
1162
1163    #[test]
1164    fn repeated_counter_access() -> Result<()> {
1165        // test that repeated access to the same counter works correctly
1166        let counter = TlsCounter::new();
1167        for i in 1..=1000 {
1168            counter.add(1);
1169            assert_eq!(counter.get(), i);
1170        }
1171        Ok(())
1172    }
1173
1174    #[test]
1175    fn sharding_distributes_across_threads() -> Result<()> {
1176        // test that different threads get assigned to different shards
1177        // and that all increments are correctly counted
1178        let counter = std::sync::Arc::new(TlsCounter::new());
1179        const THREADS: usize = 16;
1180        const ITERATIONS: u64 = 100;
1181        let handles: Vec<_> = (0..THREADS)
1182            .map(|_| {
1183                let c = counter.clone();
1184                std::thread::spawn(move || {
1185                    for _ in 0..ITERATIONS {
1186                        c.inc();
1187                    }
1188                })
1189            })
1190            .collect();
1191        for h in handles {
1192            h.join().unwrap();
1193        }
1194        assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1195        Ok(())
1196    }
1197
1198    #[test]
1199    fn sharding_handles_more_threads_than_shards() -> Result<()> {
1200        // test that shard assignment wraps correctly when threads > NUM_SHARDS
1201        let counter = std::sync::Arc::new(TlsCounter::new());
1202        const THREADS: usize = 128; // 2x NUM_SHARDS to force wrap-around
1203        const ITERATIONS: u64 = 100;
1204        let handles: Vec<_> = (0..THREADS)
1205            .map(|_| {
1206                let c = counter.clone();
1207                std::thread::spawn(move || {
1208                    for _ in 0..ITERATIONS {
1209                        c.inc();
1210                    }
1211                })
1212            })
1213            .collect();
1214        for h in handles {
1215            h.join().unwrap();
1216        }
1217        assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1218        Ok(())
1219    }
1220
1221    #[test]
1222    fn counter_independence() -> Result<()> {
1223        // test that multiple counters are completely independent
1224        let counters: Vec<_> = (0..10).map(|_| TlsCounter::new()).collect();
1225        for (i, counter) in counters.iter().enumerate() {
1226            counter.add((i + 1) as u64 * 100);
1227        }
1228        for (i, counter) in counters.iter().enumerate() {
1229            assert_eq!(counter.get(), (i + 1) as u64 * 100);
1230        }
1231        Ok(())
1232    }
1233
1234    #[test]
1235    fn copy_printer_omits_hard_links() -> Result<()> {
1236        let progress = Progress::new();
1237        let mut printer = CopyProgressPrinter::new(&progress);
1238        let output = printer.print()?;
1239        assert!(output.contains("COPIED:"));
1240        assert!(output.contains("UNCHANGED:"));
1241        assert!(output.contains("REMOVED:"));
1242        assert!(output.contains("SKIPPED:"));
1243        assert!(!output.contains("hard-links"));
1244        Ok(())
1245    }
1246
1247    #[test]
1248    fn remove_printer_hides_copy_and_unchanged() -> Result<()> {
1249        let progress = Progress::new();
1250        let mut printer = RemoveProgressPrinter::new(&progress);
1251        let output = printer.print()?;
1252        assert!(output.contains("REMOVED:"));
1253        assert!(output.contains("SKIPPED:"));
1254        assert!(!output.contains("COPIED:"));
1255        assert!(!output.contains("UNCHANGED:"));
1256        assert!(!output.contains("hard-links"));
1257        assert!(!output.contains("specials:"));
1258        Ok(())
1259    }
1260
1261    #[test]
1262    fn link_printer_shows_hard_links() -> Result<()> {
1263        let progress = Progress::new();
1264        let mut printer = LinkProgressPrinter::new(&progress);
1265        let output = printer.print()?;
1266        assert!(output.contains("LINKED / COPIED:"));
1267        assert!(output.contains("UNCHANGED:"));
1268        assert!(output.contains("REMOVED:"));
1269        assert!(output.contains("SKIPPED:"));
1270        assert!(output.contains("hard-links"));
1271        Ok(())
1272    }
1273
1274    #[test]
1275    fn compare_printer_only_shows_ops() -> Result<()> {
1276        let progress = Progress::new();
1277        let mut printer = CompareProgressPrinter::new(&progress);
1278        let output = printer.print()?;
1279        assert!(output.contains("OPS:"));
1280        assert!(!output.contains("COPIED:"));
1281        assert!(!output.contains("UNCHANGED:"));
1282        assert!(!output.contains("REMOVED:"));
1283        assert!(!output.contains("SKIPPED:"));
1284        assert!(!output.contains("GENERATED:"));
1285        Ok(())
1286    }
1287
1288    #[test]
1289    fn filegen_printer_shows_generated() -> Result<()> {
1290        let progress = Progress::new();
1291        let mut printer = FilegenProgressPrinter::new(&progress);
1292        let output = printer.print()?;
1293        assert!(output.contains("OPS:"));
1294        assert!(output.contains("GENERATED:"));
1295        assert!(!output.contains("COPIED:"));
1296        assert!(!output.contains("UNCHANGED:"));
1297        assert!(!output.contains("REMOVED:"));
1298        assert!(!output.contains("SKIPPED:"));
1299        assert!(!output.contains("symlinks:"));
1300        Ok(())
1301    }
1302
1303    #[test]
1304    fn modify_printer_renders_changed_section() {
1305        let progress = Progress::new();
1306        progress.files_changed.inc();
1307        progress.directories_unchanged.inc();
1308        let mut printer = ModifyProgressPrinter::new(&progress);
1309        let out = printer.print().unwrap();
1310        assert!(out.contains("CHANGED:"));
1311        assert!(out.contains("UNCHANGED:"));
1312        assert!(out.contains("SKIPPED:"));
1313    }
1314}