1use tracing::instrument;
2
3const NUM_SHARDS: usize = 64;
6
7#[repr(align(128))]
12struct PaddedAtomicU64(std::sync::atomic::AtomicU64);
13
14static NEXT_SHARD_INDEX: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
17
18thread_local! {
19 static MY_SHARD: usize =
22 NEXT_SHARD_INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % NUM_SHARDS;
23}
24
25pub struct TlsCounter {
39 shards: [PaddedAtomicU64; NUM_SHARDS],
40}
41
42impl TlsCounter {
43 #[must_use]
44 pub fn new() -> Self {
45 Self {
46 shards: std::array::from_fn(|_| PaddedAtomicU64(std::sync::atomic::AtomicU64::new(0))),
47 }
48 }
49
50 pub fn add(&self, value: u64) {
51 let shard = MY_SHARD.with(|&s| s);
52 self.shards[shard]
53 .0
54 .fetch_add(value, std::sync::atomic::Ordering::Relaxed);
55 }
56
57 pub fn inc(&self) {
58 self.add(1);
59 }
60
61 pub fn get(&self) -> u64 {
62 self.shards
63 .iter()
64 .map(|s| s.0.load(std::sync::atomic::Ordering::Relaxed))
65 .sum()
66 }
67}
68
69impl std::fmt::Debug for TlsCounter {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("TlsCounter")
72 .field("value", &self.get())
73 .finish()
74 }
75}
76
77impl Default for TlsCounter {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83#[derive(Debug)]
84pub struct ProgressCounter {
85 started: TlsCounter,
86 finished: TlsCounter,
87}
88
89impl Default for ProgressCounter {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95pub struct ProgressGuard<'a> {
96 progress: &'a ProgressCounter,
97}
98
99impl<'a> ProgressGuard<'a> {
100 pub fn new(progress: &'a ProgressCounter) -> Self {
101 progress.started.inc();
102 Self { progress }
103 }
104}
105
106impl Drop for ProgressGuard<'_> {
107 fn drop(&mut self) {
108 self.progress.finished.inc();
109 }
110}
111
112pub struct Status {
113 pub started: u64,
114 pub finished: u64,
115}
116
117impl ProgressCounter {
118 #[must_use]
119 pub fn new() -> Self {
120 Self {
121 started: TlsCounter::new(),
122 finished: TlsCounter::new(),
123 }
124 }
125
126 pub fn guard(&self) -> ProgressGuard<'_> {
127 ProgressGuard::new(self)
128 }
129
130 #[instrument]
131 pub fn get(&self) -> Status {
132 let mut status = Status {
133 started: self.started.get(),
134 finished: self.finished.get(),
135 };
136 if status.finished > status.started {
137 tracing::debug!(
138 "Progress inversion - started: {}, finished {}",
139 status.started,
140 status.finished
141 );
142 status.started = status.finished;
143 }
144 status
145 }
146}
147
148pub struct Progress {
149 pub ops: ProgressCounter,
150 pub bytes_copied: TlsCounter,
151 pub hard_links_created: TlsCounter,
152 pub files_copied: TlsCounter,
153 pub symlinks_created: TlsCounter,
154 pub directories_created: TlsCounter,
155 pub files_changed: TlsCounter,
156 pub symlinks_changed: TlsCounter,
157 pub directories_changed: TlsCounter,
158 pub files_unchanged: TlsCounter,
159 pub symlinks_unchanged: TlsCounter,
160 pub directories_unchanged: TlsCounter,
161 pub hard_links_unchanged: TlsCounter,
162 pub files_removed: TlsCounter,
163 pub symlinks_removed: TlsCounter,
164 pub directories_removed: TlsCounter,
165 pub bytes_removed: TlsCounter,
166 pub files_skipped: TlsCounter,
167 pub symlinks_skipped: TlsCounter,
168 pub directories_skipped: TlsCounter,
169 pub specials_skipped: TlsCounter,
170 start_time: std::time::Instant,
171}
172
173impl Progress {
174 #[must_use]
175 pub fn new() -> Self {
176 Self {
177 ops: Default::default(),
178 bytes_copied: Default::default(),
179 hard_links_created: Default::default(),
180 files_copied: Default::default(),
181 symlinks_created: Default::default(),
182 directories_created: Default::default(),
183 files_changed: Default::default(),
184 symlinks_changed: Default::default(),
185 directories_changed: Default::default(),
186 files_unchanged: Default::default(),
187 symlinks_unchanged: Default::default(),
188 directories_unchanged: Default::default(),
189 hard_links_unchanged: Default::default(),
190 files_removed: Default::default(),
191 symlinks_removed: Default::default(),
192 directories_removed: Default::default(),
193 bytes_removed: Default::default(),
194 files_skipped: Default::default(),
195 symlinks_skipped: Default::default(),
196 directories_skipped: Default::default(),
197 specials_skipped: Default::default(),
198 start_time: std::time::Instant::now(),
199 }
200 }
201
202 pub fn get_duration(&self) -> std::time::Duration {
203 self.start_time.elapsed()
204 }
205}
206
207impl Default for Progress {
208 fn default() -> Self {
209 Self::new()
210 }
211}
212
213#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
214pub struct SerializableProgress {
215 pub ops_started: u64,
216 pub ops_finished: u64,
217 pub bytes_copied: u64,
218 pub hard_links_created: u64,
219 pub files_copied: u64,
220 pub symlinks_created: u64,
221 pub directories_created: u64,
222 pub files_unchanged: u64,
223 pub symlinks_unchanged: u64,
224 pub directories_unchanged: u64,
225 pub hard_links_unchanged: u64,
226 pub files_removed: u64,
227 pub symlinks_removed: u64,
228 pub directories_removed: u64,
229 pub bytes_removed: u64,
230 pub files_skipped: u64,
231 pub symlinks_skipped: u64,
232 pub directories_skipped: u64,
233 pub specials_skipped: u64,
234 pub current_time: std::time::SystemTime,
235}
236
237impl Default for SerializableProgress {
238 fn default() -> Self {
239 Self {
240 ops_started: 0,
241 ops_finished: 0,
242 bytes_copied: 0,
243 hard_links_created: 0,
244 files_copied: 0,
245 symlinks_created: 0,
246 directories_created: 0,
247 files_unchanged: 0,
248 symlinks_unchanged: 0,
249 directories_unchanged: 0,
250 hard_links_unchanged: 0,
251 files_removed: 0,
252 symlinks_removed: 0,
253 directories_removed: 0,
254 bytes_removed: 0,
255 files_skipped: 0,
256 symlinks_skipped: 0,
257 directories_skipped: 0,
258 specials_skipped: 0,
259 current_time: std::time::SystemTime::now(),
260 }
261 }
262}
263
264impl From<&Progress> for SerializableProgress {
265 fn from(progress: &Progress) -> Self {
267 Self {
268 ops_started: progress.ops.started.get(),
269 ops_finished: progress.ops.finished.get(),
270 bytes_copied: progress.bytes_copied.get(),
271 hard_links_created: progress.hard_links_created.get(),
272 files_copied: progress.files_copied.get(),
273 symlinks_created: progress.symlinks_created.get(),
274 directories_created: progress.directories_created.get(),
275 files_unchanged: progress.files_unchanged.get(),
276 symlinks_unchanged: progress.symlinks_unchanged.get(),
277 directories_unchanged: progress.directories_unchanged.get(),
278 hard_links_unchanged: progress.hard_links_unchanged.get(),
279 files_removed: progress.files_removed.get(),
280 symlinks_removed: progress.symlinks_removed.get(),
281 directories_removed: progress.directories_removed.get(),
282 bytes_removed: progress.bytes_removed.get(),
283 files_skipped: progress.files_skipped.get(),
284 symlinks_skipped: progress.symlinks_skipped.get(),
285 directories_skipped: progress.directories_skipped.get(),
286 specials_skipped: progress.specials_skipped.get(),
287 current_time: std::time::SystemTime::now(),
288 }
289 }
290}
291
292pub trait LocalProgressReport: Send {
298 fn print(&mut self) -> anyhow::Result<String>;
299}
300
301#[derive(Copy, Clone, Debug)]
303pub enum LocalProgressKind {
304 Copy,
305 Remove,
306 Link,
307 Compare,
308 Filegen,
309 Modify,
310}
311
312fn ops_rates(
313 progress: &Progress,
314 last_ops: u64,
315 last_update: std::time::Instant,
316 now: std::time::Instant,
317) -> (Status, f64, f64) {
318 let ops = progress.ops.get();
319 let total_secs = progress.get_duration().as_secs_f64();
320 let curr_secs = (now - last_update).as_secs_f64();
321 let average = if total_secs > 0.0 {
322 ops.finished as f64 / total_secs
323 } else {
324 0.0
325 };
326 let current = if curr_secs > 0.0 {
327 (ops.finished - last_ops) as f64 / curr_secs
328 } else {
329 0.0
330 };
331 (ops, average, current)
332}
333
334fn bytes_rates(
335 total: u64,
336 last: u64,
337 total_secs: f64,
338 curr_secs: f64,
339) -> (bytesize::ByteSize, bytesize::ByteSize) {
340 let average = if total_secs > 0.0 {
341 total as f64 / total_secs
342 } else {
343 0.0
344 };
345 let current = if curr_secs > 0.0 {
346 (total - last) as f64 / curr_secs
347 } else {
348 0.0
349 };
350 (
351 bytesize::ByteSize(average as u64),
352 bytesize::ByteSize(current as u64),
353 )
354}
355
356pub struct CopyProgressPrinter<'a> {
360 progress: &'a Progress,
361 last_ops: u64,
362 last_bytes: u64,
363 last_update: std::time::Instant,
364}
365
366impl<'a> CopyProgressPrinter<'a> {
367 pub fn new(progress: &'a Progress) -> Self {
368 Self {
369 progress,
370 last_ops: progress.ops.get().finished,
371 last_bytes: progress.bytes_copied.get(),
372 last_update: std::time::Instant::now(),
373 }
374 }
375}
376
377impl<'a> LocalProgressReport for CopyProgressPrinter<'a> {
378 fn print(&mut self) -> anyhow::Result<String> {
379 let now = std::time::Instant::now();
380 let (ops, avg_ops, cur_ops) =
381 ops_rates(self.progress, self.last_ops, self.last_update, now);
382 let bytes = self.progress.bytes_copied.get();
383 let total_secs = self.progress.get_duration().as_secs_f64();
384 let curr_secs = (now - self.last_update).as_secs_f64();
385 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
386 self.last_ops = ops.finished;
387 self.last_bytes = bytes;
388 self.last_update = now;
389 Ok(format!(
390 "---------------------\n\
391 OPS:\n\
392 pending: {:>10}\n\
393 average: {:>10.2} items/s\n\
394 current: {:>10.2} items/s\n\
395 -----------------------\n\
396 COPIED:\n\
397 average: {:>10}/s\n\
398 current: {:>10}/s\n\
399 bytes: {:>10}\n\
400 files: {:>10}\n\
401 symlinks: {:>10}\n\
402 directories: {:>10}\n\
403 -----------------------\n\
404 UNCHANGED:\n\
405 files: {:>10}\n\
406 symlinks: {:>10}\n\
407 directories: {:>10}\n\
408 -----------------------\n\
409 REMOVED:\n\
410 bytes: {:>10}\n\
411 files: {:>10}\n\
412 symlinks: {:>10}\n\
413 directories: {:>10}\n\
414 -----------------------\n\
415 SKIPPED:\n\
416 files: {:>10}\n\
417 symlinks: {:>10}\n\
418 directories: {:>10}\n\
419 specials: {:>10}",
420 ops.started - ops.finished,
421 avg_ops,
422 cur_ops,
423 avg_bytes,
424 cur_bytes,
425 bytesize::ByteSize(bytes),
426 self.progress.files_copied.get(),
427 self.progress.symlinks_created.get(),
428 self.progress.directories_created.get(),
429 self.progress.files_unchanged.get(),
430 self.progress.symlinks_unchanged.get(),
431 self.progress.directories_unchanged.get(),
432 bytesize::ByteSize(self.progress.bytes_removed.get()),
433 self.progress.files_removed.get(),
434 self.progress.symlinks_removed.get(),
435 self.progress.directories_removed.get(),
436 self.progress.files_skipped.get(),
437 self.progress.symlinks_skipped.get(),
438 self.progress.directories_skipped.get(),
439 self.progress.specials_skipped.get(),
440 ))
441 }
442}
443
444pub struct RemoveProgressPrinter<'a> {
447 progress: &'a Progress,
448 last_ops: u64,
449 last_bytes_removed: u64,
450 last_update: std::time::Instant,
451}
452
453impl<'a> RemoveProgressPrinter<'a> {
454 pub fn new(progress: &'a Progress) -> Self {
455 Self {
456 progress,
457 last_ops: progress.ops.get().finished,
458 last_bytes_removed: progress.bytes_removed.get(),
459 last_update: std::time::Instant::now(),
460 }
461 }
462}
463
464impl<'a> LocalProgressReport for RemoveProgressPrinter<'a> {
465 fn print(&mut self) -> anyhow::Result<String> {
466 let now = std::time::Instant::now();
467 let (ops, avg_ops, cur_ops) =
468 ops_rates(self.progress, self.last_ops, self.last_update, now);
469 let bytes_removed = self.progress.bytes_removed.get();
470 let total_secs = self.progress.get_duration().as_secs_f64();
471 let curr_secs = (now - self.last_update).as_secs_f64();
472 let (avg_bytes, cur_bytes) = bytes_rates(
473 bytes_removed,
474 self.last_bytes_removed,
475 total_secs,
476 curr_secs,
477 );
478 self.last_ops = ops.finished;
479 self.last_bytes_removed = bytes_removed;
480 self.last_update = now;
481 Ok(format!(
482 "---------------------\n\
483 OPS:\n\
484 pending: {:>10}\n\
485 average: {:>10.2} items/s\n\
486 current: {:>10.2} items/s\n\
487 -----------------------\n\
488 REMOVED:\n\
489 average: {:>10}/s\n\
490 current: {:>10}/s\n\
491 bytes: {:>10}\n\
492 files: {:>10}\n\
493 symlinks: {:>10}\n\
494 directories: {:>10}\n\
495 -----------------------\n\
496 SKIPPED:\n\
497 files: {:>10}\n\
498 symlinks: {:>10}\n\
499 directories: {:>10}",
500 ops.started - ops.finished,
501 avg_ops,
502 cur_ops,
503 avg_bytes,
504 cur_bytes,
505 bytesize::ByteSize(bytes_removed),
506 self.progress.files_removed.get(),
507 self.progress.symlinks_removed.get(),
508 self.progress.directories_removed.get(),
509 self.progress.files_skipped.get(),
510 self.progress.symlinks_skipped.get(),
511 self.progress.directories_skipped.get(),
512 ))
513 }
514}
515
516pub struct ModifyProgressPrinter<'a> {
519 progress: &'a Progress,
520 last_ops: u64,
521 last_update: std::time::Instant,
522}
523
524impl<'a> ModifyProgressPrinter<'a> {
525 pub fn new(progress: &'a Progress) -> Self {
526 Self {
527 progress,
528 last_ops: progress.ops.get().finished,
529 last_update: std::time::Instant::now(),
530 }
531 }
532}
533
534impl<'a> LocalProgressReport for ModifyProgressPrinter<'a> {
535 fn print(&mut self) -> anyhow::Result<String> {
536 let now = std::time::Instant::now();
537 let (ops, avg_ops, cur_ops) =
538 ops_rates(self.progress, self.last_ops, self.last_update, now);
539 self.last_ops = ops.finished;
540 self.last_update = now;
541 Ok(format!(
542 "---------------------\n\
543 OPS:\n\
544 pending: {:>10}\n\
545 average: {:>10.2} items/s\n\
546 current: {:>10.2} items/s\n\
547 -----------------------\n\
548 CHANGED:\n\
549 files: {:>10}\n\
550 symlinks: {:>10}\n\
551 directories: {:>10}\n\
552 -----------------------\n\
553 UNCHANGED:\n\
554 files: {:>10}\n\
555 symlinks: {:>10}\n\
556 directories: {:>10}\n\
557 -----------------------\n\
558 SKIPPED:\n\
559 files: {:>10}\n\
560 symlinks: {:>10}\n\
561 directories: {:>10}",
562 ops.started - ops.finished,
563 avg_ops,
564 cur_ops,
565 self.progress.files_changed.get(),
566 self.progress.symlinks_changed.get(),
567 self.progress.directories_changed.get(),
568 self.progress.files_unchanged.get(),
569 self.progress.symlinks_unchanged.get(),
570 self.progress.directories_unchanged.get(),
571 self.progress.files_skipped.get(),
572 self.progress.symlinks_skipped.get(),
573 self.progress.directories_skipped.get(),
574 ))
575 }
576}
577
578pub struct LinkProgressPrinter<'a> {
582 progress: &'a Progress,
583 last_ops: u64,
584 last_bytes: u64,
585 last_update: std::time::Instant,
586}
587
588impl<'a> LinkProgressPrinter<'a> {
589 pub fn new(progress: &'a Progress) -> Self {
590 Self {
591 progress,
592 last_ops: progress.ops.get().finished,
593 last_bytes: progress.bytes_copied.get(),
594 last_update: std::time::Instant::now(),
595 }
596 }
597}
598
599impl<'a> LocalProgressReport for LinkProgressPrinter<'a> {
600 fn print(&mut self) -> anyhow::Result<String> {
601 let now = std::time::Instant::now();
602 let (ops, avg_ops, cur_ops) =
603 ops_rates(self.progress, self.last_ops, self.last_update, now);
604 let bytes = self.progress.bytes_copied.get();
605 let total_secs = self.progress.get_duration().as_secs_f64();
606 let curr_secs = (now - self.last_update).as_secs_f64();
607 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
608 self.last_ops = ops.finished;
609 self.last_bytes = bytes;
610 self.last_update = now;
611 Ok(format!(
612 "---------------------\n\
613 OPS:\n\
614 pending: {:>10}\n\
615 average: {:>10.2} items/s\n\
616 current: {:>10.2} items/s\n\
617 -----------------------\n\
618 LINKED / COPIED:\n\
619 average: {:>10}/s\n\
620 current: {:>10}/s\n\
621 bytes: {:>10}\n\
622 hard-links: {:>10}\n\
623 files: {:>10}\n\
624 symlinks: {:>10}\n\
625 directories: {:>10}\n\
626 -----------------------\n\
627 UNCHANGED:\n\
628 hard-links: {:>10}\n\
629 files: {:>10}\n\
630 symlinks: {:>10}\n\
631 directories: {:>10}\n\
632 -----------------------\n\
633 REMOVED:\n\
634 bytes: {:>10}\n\
635 files: {:>10}\n\
636 symlinks: {:>10}\n\
637 directories: {:>10}\n\
638 -----------------------\n\
639 SKIPPED:\n\
640 files: {:>10}\n\
641 symlinks: {:>10}\n\
642 directories: {:>10}\n\
643 specials: {:>10}",
644 ops.started - ops.finished,
645 avg_ops,
646 cur_ops,
647 avg_bytes,
648 cur_bytes,
649 bytesize::ByteSize(bytes),
650 self.progress.hard_links_created.get(),
651 self.progress.files_copied.get(),
652 self.progress.symlinks_created.get(),
653 self.progress.directories_created.get(),
654 self.progress.hard_links_unchanged.get(),
655 self.progress.files_unchanged.get(),
656 self.progress.symlinks_unchanged.get(),
657 self.progress.directories_unchanged.get(),
658 bytesize::ByteSize(self.progress.bytes_removed.get()),
659 self.progress.files_removed.get(),
660 self.progress.symlinks_removed.get(),
661 self.progress.directories_removed.get(),
662 self.progress.files_skipped.get(),
663 self.progress.symlinks_skipped.get(),
664 self.progress.directories_skipped.get(),
665 self.progress.specials_skipped.get(),
666 ))
667 }
668}
669
670pub struct CompareProgressPrinter<'a> {
673 progress: &'a Progress,
674 last_ops: u64,
675 last_update: std::time::Instant,
676}
677
678impl<'a> CompareProgressPrinter<'a> {
679 pub fn new(progress: &'a Progress) -> Self {
680 Self {
681 progress,
682 last_ops: progress.ops.get().finished,
683 last_update: std::time::Instant::now(),
684 }
685 }
686}
687
688impl<'a> LocalProgressReport for CompareProgressPrinter<'a> {
689 fn print(&mut self) -> anyhow::Result<String> {
690 let now = std::time::Instant::now();
691 let (ops, avg_ops, cur_ops) =
692 ops_rates(self.progress, self.last_ops, self.last_update, now);
693 self.last_ops = ops.finished;
694 self.last_update = now;
695 Ok(format!(
696 "---------------------\n\
697 OPS:\n\
698 pending: {:>10}\n\
699 compared: {:>9}\n\
700 average: {:>10.2} items/s\n\
701 current: {:>10.2} items/s",
702 ops.started - ops.finished,
703 ops.finished,
704 avg_ops,
705 cur_ops,
706 ))
707 }
708}
709
710pub struct FilegenProgressPrinter<'a> {
713 progress: &'a Progress,
714 last_ops: u64,
715 last_bytes: u64,
716 last_update: std::time::Instant,
717}
718
719impl<'a> FilegenProgressPrinter<'a> {
720 pub fn new(progress: &'a Progress) -> Self {
721 Self {
722 progress,
723 last_ops: progress.ops.get().finished,
724 last_bytes: progress.bytes_copied.get(),
725 last_update: std::time::Instant::now(),
726 }
727 }
728}
729
730impl<'a> LocalProgressReport for FilegenProgressPrinter<'a> {
731 fn print(&mut self) -> anyhow::Result<String> {
732 let now = std::time::Instant::now();
733 let (ops, avg_ops, cur_ops) =
734 ops_rates(self.progress, self.last_ops, self.last_update, now);
735 let bytes = self.progress.bytes_copied.get();
736 let total_secs = self.progress.get_duration().as_secs_f64();
737 let curr_secs = (now - self.last_update).as_secs_f64();
738 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
739 self.last_ops = ops.finished;
740 self.last_bytes = bytes;
741 self.last_update = now;
742 Ok(format!(
743 "---------------------\n\
744 OPS:\n\
745 pending: {:>10}\n\
746 average: {:>10.2} items/s\n\
747 current: {:>10.2} items/s\n\
748 -----------------------\n\
749 GENERATED:\n\
750 average: {:>10}/s\n\
751 current: {:>10}/s\n\
752 bytes: {:>10}\n\
753 files: {:>10}\n\
754 directories: {:>10}",
755 ops.started - ops.finished,
756 avg_ops,
757 cur_ops,
758 avg_bytes,
759 cur_bytes,
760 bytesize::ByteSize(bytes),
761 self.progress.files_copied.get(),
762 self.progress.directories_created.get(),
763 ))
764 }
765}
766
767#[must_use]
770pub fn make_local_printer<'a>(
771 kind: LocalProgressKind,
772 progress: &'a Progress,
773) -> Box<dyn LocalProgressReport + 'a> {
774 match kind {
775 LocalProgressKind::Copy => Box::new(CopyProgressPrinter::new(progress)),
776 LocalProgressKind::Remove => Box::new(RemoveProgressPrinter::new(progress)),
777 LocalProgressKind::Link => Box::new(LinkProgressPrinter::new(progress)),
778 LocalProgressKind::Compare => Box::new(CompareProgressPrinter::new(progress)),
779 LocalProgressKind::Filegen => Box::new(FilegenProgressPrinter::new(progress)),
780 LocalProgressKind::Modify => Box::new(ModifyProgressPrinter::new(progress)),
781 }
782}
783
784pub struct RcpdProgressPrinter {
785 start_time: std::time::Instant,
786 last_source_ops: u64,
787 last_source_bytes: u64,
788 last_source_files: u64,
789 last_dest_ops: u64,
790 last_dest_bytes: u64,
791 last_update: std::time::Instant,
792}
793
794impl RcpdProgressPrinter {
795 #[must_use]
796 pub fn new() -> Self {
797 let now = std::time::Instant::now();
798 Self {
799 start_time: now,
800 last_source_ops: 0,
801 last_source_bytes: 0,
802 last_source_files: 0,
803 last_dest_ops: 0,
804 last_dest_bytes: 0,
805 last_update: now,
806 }
807 }
808
809 fn calculate_current_rate(&self, current: u64, last: u64, duration_secs: f64) -> f64 {
810 if duration_secs > 0.0 {
811 (current - last) as f64 / duration_secs
812 } else {
813 0.0
814 }
815 }
816
817 fn calculate_average_rate(&self, total: u64, total_duration_secs: f64) -> f64 {
818 if total_duration_secs > 0.0 {
819 total as f64 / total_duration_secs
820 } else {
821 0.0
822 }
823 }
824
825 pub fn print(
826 &mut self,
827 source_progress: &SerializableProgress,
828 dest_progress: &SerializableProgress,
829 ) -> anyhow::Result<String> {
830 let time_now = std::time::Instant::now();
831 let total_duration_secs = (time_now - self.start_time).as_secs_f64();
832 let curr_duration_secs = (time_now - self.last_update).as_secs_f64();
833 let source_ops_rate_curr = self.calculate_current_rate(
835 source_progress.ops_finished,
836 self.last_source_ops,
837 curr_duration_secs,
838 );
839 let source_bytes_rate_curr = self.calculate_current_rate(
840 source_progress.bytes_copied,
841 self.last_source_bytes,
842 curr_duration_secs,
843 );
844 let source_files_rate_curr = self.calculate_current_rate(
845 source_progress.files_copied,
846 self.last_source_files,
847 curr_duration_secs,
848 );
849 let source_ops_rate_avg =
851 self.calculate_average_rate(source_progress.ops_finished, total_duration_secs);
852 let source_bytes_rate_avg =
853 self.calculate_average_rate(source_progress.bytes_copied, total_duration_secs);
854 let source_files_rate_avg =
855 self.calculate_average_rate(source_progress.files_copied, total_duration_secs);
856 let dest_ops_rate_curr = self.calculate_current_rate(
858 dest_progress.ops_finished,
859 self.last_dest_ops,
860 curr_duration_secs,
861 );
862 let dest_bytes_rate_curr = self.calculate_current_rate(
863 dest_progress.bytes_copied,
864 self.last_dest_bytes,
865 curr_duration_secs,
866 );
867 let dest_ops_rate_avg =
869 self.calculate_average_rate(dest_progress.ops_finished, total_duration_secs);
870 let dest_bytes_rate_avg =
871 self.calculate_average_rate(dest_progress.bytes_copied, total_duration_secs);
872 self.last_source_ops = source_progress.ops_finished;
874 self.last_source_bytes = source_progress.bytes_copied;
875 self.last_source_files = source_progress.files_copied;
876 self.last_dest_ops = dest_progress.ops_finished;
877 self.last_dest_bytes = dest_progress.bytes_copied;
878 self.last_update = time_now;
879 Ok(format!(
880 "==== SOURCE =======\n\
881 OPS:\n\
882 pending: {:>10}\n\
883 average: {:>10.2} items/s\n\
884 current: {:>10.2} items/s\n\
885 ---------------------\n\
886 COPIED:\n\
887 average: {:>10}/s\n\
888 current: {:>10}/s\n\
889 bytes: {:>10}\n\
890 files: {:>10}\n\
891 ---------------------\n\
892 FILES:\n\
893 average: {:>10.2} files/s\n\
894 current: {:>10.2} files/s\n\
895 ---------------------\n\
896 SKIPPED:\n\
897 files: {:>10}\n\
898 symlinks: {:>10}\n\
899 directories: {:>10}\n\
900 specials: {:>10}\n\
901 ==== DESTINATION ====\n\
902 OPS:\n\
903 pending: {:>10}\n\
904 average: {:>10.2} items/s\n\
905 current: {:>10.2} items/s\n\
906 ---------------------\n\
907 COPIED:\n\
908 average: {:>10}/s\n\
909 current: {:>10}/s\n\
910 bytes: {:>10}\n\
911 files: {:>10}\n\
912 symlinks: {:>10}\n\
913 directories: {:>10}\n\
914 hard-links: {:>10}\n\
915 ---------------------\n\
916 UNCHANGED:\n\
917 files: {:>10}\n\
918 symlinks: {:>10}\n\
919 directories: {:>10}\n\
920 hard-links: {:>10}\n\
921 ---------------------\n\
922 REMOVED:\n\
923 bytes: {:>10}\n\
924 files: {:>10}\n\
925 symlinks: {:>10}\n\
926 directories: {:>10}",
927 source_progress.ops_started - source_progress.ops_finished, source_ops_rate_avg,
930 source_ops_rate_curr,
931 bytesize::ByteSize(source_bytes_rate_avg as u64),
932 bytesize::ByteSize(source_bytes_rate_curr as u64),
933 bytesize::ByteSize(source_progress.bytes_copied),
934 source_progress.files_copied,
935 source_files_rate_avg,
936 source_files_rate_curr,
937 source_progress.files_skipped,
939 source_progress.symlinks_skipped,
940 source_progress.directories_skipped,
941 source_progress.specials_skipped,
942 dest_progress.ops_started - dest_progress.ops_finished, dest_ops_rate_avg,
945 dest_ops_rate_curr,
946 bytesize::ByteSize(dest_bytes_rate_avg as u64),
947 bytesize::ByteSize(dest_bytes_rate_curr as u64),
948 bytesize::ByteSize(dest_progress.bytes_copied),
949 dest_progress.files_copied,
951 dest_progress.symlinks_created,
952 dest_progress.directories_created,
953 dest_progress.hard_links_created,
954 dest_progress.files_unchanged,
956 dest_progress.symlinks_unchanged,
957 dest_progress.directories_unchanged,
958 dest_progress.hard_links_unchanged,
959 bytesize::ByteSize(dest_progress.bytes_removed),
961 dest_progress.files_removed,
962 dest_progress.symlinks_removed,
963 dest_progress.directories_removed,
964 ))
965 }
966}
967
968impl Default for RcpdProgressPrinter {
969 fn default() -> Self {
970 Self::new()
971 }
972}
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977 use crate::remote_tracing::TracingMessage;
978 use anyhow::Result;
979
980 #[test]
981 fn basic_counting() -> Result<()> {
982 let tls_counter = TlsCounter::new();
983 for _ in 0..10 {
984 tls_counter.inc();
985 }
986 assert!(tls_counter.get() == 10);
987 Ok(())
988 }
989
990 #[test]
991 fn threaded_counting() -> Result<()> {
992 let tls_counter = TlsCounter::new();
993 std::thread::scope(|scope| {
994 let mut handles = Vec::new();
995 for _ in 0..10 {
996 handles.push(scope.spawn(|| {
997 for _ in 0..100 {
998 tls_counter.inc();
999 }
1000 }));
1001 }
1002 });
1003 assert!(tls_counter.get() == 1000);
1004 Ok(())
1005 }
1006
1007 #[test]
1008 fn basic_guard() -> Result<()> {
1009 let tls_progress = ProgressCounter::new();
1010 let _guard = tls_progress.guard();
1011 Ok(())
1012 }
1013
1014 #[test]
1015 fn test_serializable_progress() -> Result<()> {
1016 let progress = Progress::new();
1017
1018 progress.files_copied.inc();
1020 progress.bytes_copied.add(1024);
1021 progress.directories_created.add(2);
1022
1023 let serializable = SerializableProgress::from(&progress);
1025 assert_eq!(serializable.files_copied, 1);
1026 assert_eq!(serializable.bytes_copied, 1024);
1027 assert_eq!(serializable.directories_created, 2);
1028
1029 let _tracing_msg = TracingMessage::Progress(serializable);
1031
1032 Ok(())
1033 }
1034
1035 #[test]
1036 fn test_rcpd_progress_printer() -> Result<()> {
1037 let mut printer = RcpdProgressPrinter::new();
1038
1039 let source_progress = SerializableProgress {
1041 ops_started: 100,
1042 ops_finished: 80,
1043 bytes_copied: 1024,
1044 files_copied: 5,
1045 files_skipped: 3,
1046 symlinks_skipped: 1,
1047 directories_skipped: 2,
1048 ..Default::default()
1049 };
1050
1051 let dest_progress = SerializableProgress {
1052 ops_started: 80,
1053 ops_finished: 70,
1054 bytes_copied: 1024,
1055 files_copied: 8,
1056 symlinks_created: 2,
1057 directories_created: 1,
1058 ..Default::default()
1059 };
1060
1061 let output = printer.print(&source_progress, &dest_progress)?;
1063 assert!(output.contains("SOURCE"));
1064 assert!(output.contains("DESTINATION"));
1065 assert!(output.contains("OPS:"));
1066 assert!(output.contains("pending:"));
1067 assert!(output.contains("20")); assert!(output.contains("10")); let mut sections = output.split("==== DESTINATION ====");
1070 let source_section = sections.next().unwrap();
1071 let dest_section = sections.next().unwrap_or("");
1072 let source_files_line = source_section
1073 .lines()
1074 .find(|line| line.trim_start().starts_with("files:"))
1075 .expect("source files line missing");
1076 assert!(source_files_line.trim_start().ends_with("5"));
1077 assert!(!source_files_line.contains('.'));
1078 let dest_files_line = dest_section
1079 .lines()
1080 .find(|line| line.trim_start().starts_with("files:"))
1081 .expect("dest files line missing");
1082 assert!(dest_files_line.trim_start().ends_with("8"));
1083 assert!(!dest_files_line.contains('.'));
1084 assert!(source_section.contains("SKIPPED:"));
1086 let skipped_section = source_section
1087 .split("SKIPPED:")
1088 .nth(1)
1089 .expect("SKIPPED section missing in source");
1090 let skipped_lines: Vec<&str> = skipped_section.lines().collect();
1091 let skipped_files_line = skipped_lines
1092 .iter()
1093 .find(|line| line.trim_start().starts_with("files:"))
1094 .expect("skipped files line missing");
1095 assert!(skipped_files_line.trim_start().ends_with("3"));
1096 let skipped_symlinks_line = skipped_lines
1097 .iter()
1098 .find(|line| line.trim_start().starts_with("symlinks:"))
1099 .expect("skipped symlinks line missing");
1100 assert!(skipped_symlinks_line.trim_start().ends_with("1"));
1101 let skipped_dirs_line = skipped_lines
1102 .iter()
1103 .find(|line| line.trim_start().starts_with("directories:"))
1104 .expect("skipped directories line missing");
1105 assert!(skipped_dirs_line.trim_start().ends_with("2"));
1106
1107 Ok(())
1108 }
1109
1110 #[test]
1111 fn interleaved_counter_access() -> Result<()> {
1112 let counter_a = TlsCounter::new();
1115 let counter_b = TlsCounter::new();
1116 let counter_c = TlsCounter::new();
1117 for i in 0..100 {
1118 counter_a.add(1);
1119 counter_b.add(2);
1120 counter_c.add(3);
1121 if i % 10 == 0 {
1123 assert_eq!(counter_a.get(), i + 1);
1124 assert_eq!(counter_b.get(), (i + 1) * 2);
1125 assert_eq!(counter_c.get(), (i + 1) * 3);
1126 }
1127 }
1128 assert_eq!(counter_a.get(), 100);
1130 assert_eq!(counter_b.get(), 200);
1131 assert_eq!(counter_c.get(), 300);
1132 Ok(())
1133 }
1134
1135 #[test]
1136 fn concurrent_multi_counter_access() -> Result<()> {
1137 let counter_a = std::sync::Arc::new(TlsCounter::new());
1139 let counter_b = std::sync::Arc::new(TlsCounter::new());
1140 const THREADS: usize = 4;
1141 const ITERATIONS: u64 = 1000;
1142 let handles: Vec<_> = (0..THREADS)
1143 .map(|_| {
1144 let ca = counter_a.clone();
1145 let cb = counter_b.clone();
1146 std::thread::spawn(move || {
1147 for _ in 0..ITERATIONS {
1148 ca.add(1);
1149 cb.add(2);
1150 }
1151 })
1152 })
1153 .collect();
1154 for h in handles {
1155 h.join().unwrap();
1156 }
1157 assert_eq!(counter_a.get(), THREADS as u64 * ITERATIONS);
1159 assert_eq!(counter_b.get(), THREADS as u64 * ITERATIONS * 2);
1160 Ok(())
1161 }
1162
1163 #[test]
1164 fn repeated_counter_access() -> Result<()> {
1165 let counter = TlsCounter::new();
1167 for i in 1..=1000 {
1168 counter.add(1);
1169 assert_eq!(counter.get(), i);
1170 }
1171 Ok(())
1172 }
1173
1174 #[test]
1175 fn sharding_distributes_across_threads() -> Result<()> {
1176 let counter = std::sync::Arc::new(TlsCounter::new());
1179 const THREADS: usize = 16;
1180 const ITERATIONS: u64 = 100;
1181 let handles: Vec<_> = (0..THREADS)
1182 .map(|_| {
1183 let c = counter.clone();
1184 std::thread::spawn(move || {
1185 for _ in 0..ITERATIONS {
1186 c.inc();
1187 }
1188 })
1189 })
1190 .collect();
1191 for h in handles {
1192 h.join().unwrap();
1193 }
1194 assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1195 Ok(())
1196 }
1197
1198 #[test]
1199 fn sharding_handles_more_threads_than_shards() -> Result<()> {
1200 let counter = std::sync::Arc::new(TlsCounter::new());
1202 const THREADS: usize = 128; const ITERATIONS: u64 = 100;
1204 let handles: Vec<_> = (0..THREADS)
1205 .map(|_| {
1206 let c = counter.clone();
1207 std::thread::spawn(move || {
1208 for _ in 0..ITERATIONS {
1209 c.inc();
1210 }
1211 })
1212 })
1213 .collect();
1214 for h in handles {
1215 h.join().unwrap();
1216 }
1217 assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1218 Ok(())
1219 }
1220
1221 #[test]
1222 fn counter_independence() -> Result<()> {
1223 let counters: Vec<_> = (0..10).map(|_| TlsCounter::new()).collect();
1225 for (i, counter) in counters.iter().enumerate() {
1226 counter.add((i + 1) as u64 * 100);
1227 }
1228 for (i, counter) in counters.iter().enumerate() {
1229 assert_eq!(counter.get(), (i + 1) as u64 * 100);
1230 }
1231 Ok(())
1232 }
1233
1234 #[test]
1235 fn copy_printer_omits_hard_links() -> Result<()> {
1236 let progress = Progress::new();
1237 let mut printer = CopyProgressPrinter::new(&progress);
1238 let output = printer.print()?;
1239 assert!(output.contains("COPIED:"));
1240 assert!(output.contains("UNCHANGED:"));
1241 assert!(output.contains("REMOVED:"));
1242 assert!(output.contains("SKIPPED:"));
1243 assert!(!output.contains("hard-links"));
1244 Ok(())
1245 }
1246
1247 #[test]
1248 fn remove_printer_hides_copy_and_unchanged() -> Result<()> {
1249 let progress = Progress::new();
1250 let mut printer = RemoveProgressPrinter::new(&progress);
1251 let output = printer.print()?;
1252 assert!(output.contains("REMOVED:"));
1253 assert!(output.contains("SKIPPED:"));
1254 assert!(!output.contains("COPIED:"));
1255 assert!(!output.contains("UNCHANGED:"));
1256 assert!(!output.contains("hard-links"));
1257 assert!(!output.contains("specials:"));
1258 Ok(())
1259 }
1260
1261 #[test]
1262 fn link_printer_shows_hard_links() -> Result<()> {
1263 let progress = Progress::new();
1264 let mut printer = LinkProgressPrinter::new(&progress);
1265 let output = printer.print()?;
1266 assert!(output.contains("LINKED / COPIED:"));
1267 assert!(output.contains("UNCHANGED:"));
1268 assert!(output.contains("REMOVED:"));
1269 assert!(output.contains("SKIPPED:"));
1270 assert!(output.contains("hard-links"));
1271 Ok(())
1272 }
1273
1274 #[test]
1275 fn compare_printer_only_shows_ops() -> Result<()> {
1276 let progress = Progress::new();
1277 let mut printer = CompareProgressPrinter::new(&progress);
1278 let output = printer.print()?;
1279 assert!(output.contains("OPS:"));
1280 assert!(!output.contains("COPIED:"));
1281 assert!(!output.contains("UNCHANGED:"));
1282 assert!(!output.contains("REMOVED:"));
1283 assert!(!output.contains("SKIPPED:"));
1284 assert!(!output.contains("GENERATED:"));
1285 Ok(())
1286 }
1287
1288 #[test]
1289 fn filegen_printer_shows_generated() -> Result<()> {
1290 let progress = Progress::new();
1291 let mut printer = FilegenProgressPrinter::new(&progress);
1292 let output = printer.print()?;
1293 assert!(output.contains("OPS:"));
1294 assert!(output.contains("GENERATED:"));
1295 assert!(!output.contains("COPIED:"));
1296 assert!(!output.contains("UNCHANGED:"));
1297 assert!(!output.contains("REMOVED:"));
1298 assert!(!output.contains("SKIPPED:"));
1299 assert!(!output.contains("symlinks:"));
1300 Ok(())
1301 }
1302
1303 #[test]
1304 fn modify_printer_renders_changed_section() {
1305 let progress = Progress::new();
1306 progress.files_changed.inc();
1307 progress.directories_unchanged.inc();
1308 let mut printer = ModifyProgressPrinter::new(&progress);
1309 let out = printer.print().unwrap();
1310 assert!(out.contains("CHANGED:"));
1311 assert!(out.contains("UNCHANGED:"));
1312 assert!(out.contains("SKIPPED:"));
1313 }
1314}