1use tracing::instrument;
2
3const NUM_SHARDS: usize = 64;
6
7#[repr(align(128))]
12struct PaddedAtomicU64(std::sync::atomic::AtomicU64);
13
14static NEXT_SHARD_INDEX: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
17
18thread_local! {
19 static MY_SHARD: usize =
22 NEXT_SHARD_INDEX.fetch_add(1, std::sync::atomic::Ordering::Relaxed) % NUM_SHARDS;
23}
24
25pub struct TlsCounter {
39 shards: [PaddedAtomicU64; NUM_SHARDS],
40}
41
42impl TlsCounter {
43 #[must_use]
44 pub fn new() -> Self {
45 Self {
46 shards: std::array::from_fn(|_| PaddedAtomicU64(std::sync::atomic::AtomicU64::new(0))),
47 }
48 }
49
50 pub fn add(&self, value: u64) {
51 let shard = MY_SHARD.with(|&s| s);
52 self.shards[shard]
53 .0
54 .fetch_add(value, std::sync::atomic::Ordering::Relaxed);
55 }
56
57 pub fn inc(&self) {
58 self.add(1);
59 }
60
61 pub fn get(&self) -> u64 {
62 self.shards
63 .iter()
64 .map(|s| s.0.load(std::sync::atomic::Ordering::Relaxed))
65 .sum()
66 }
67}
68
69impl std::fmt::Debug for TlsCounter {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 f.debug_struct("TlsCounter")
72 .field("value", &self.get())
73 .finish()
74 }
75}
76
77impl Default for TlsCounter {
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83#[derive(Debug)]
84pub struct ProgressCounter {
85 started: TlsCounter,
86 finished: TlsCounter,
87}
88
89impl Default for ProgressCounter {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95pub struct ProgressGuard<'a> {
96 progress: &'a ProgressCounter,
97}
98
99impl<'a> ProgressGuard<'a> {
100 pub fn new(progress: &'a ProgressCounter) -> Self {
101 progress.started.inc();
102 Self { progress }
103 }
104}
105
106impl Drop for ProgressGuard<'_> {
107 fn drop(&mut self) {
108 self.progress.finished.inc();
109 }
110}
111
112pub struct Status {
113 pub started: u64,
114 pub finished: u64,
115}
116
117impl ProgressCounter {
118 #[must_use]
119 pub fn new() -> Self {
120 Self {
121 started: TlsCounter::new(),
122 finished: TlsCounter::new(),
123 }
124 }
125
126 pub fn guard(&self) -> ProgressGuard<'_> {
127 ProgressGuard::new(self)
128 }
129
130 #[instrument]
131 pub fn get(&self) -> Status {
132 let mut status = Status {
133 started: self.started.get(),
134 finished: self.finished.get(),
135 };
136 if status.finished > status.started {
137 tracing::debug!(
138 "Progress inversion - started: {}, finished {}",
139 status.started,
140 status.finished
141 );
142 status.started = status.finished;
143 }
144 status
145 }
146}
147
148pub struct Progress {
149 pub ops: ProgressCounter,
150 pub bytes_copied: TlsCounter,
151 pub hard_links_created: TlsCounter,
152 pub files_copied: TlsCounter,
153 pub symlinks_created: TlsCounter,
154 pub directories_created: TlsCounter,
155 pub files_unchanged: TlsCounter,
156 pub symlinks_unchanged: TlsCounter,
157 pub directories_unchanged: TlsCounter,
158 pub hard_links_unchanged: TlsCounter,
159 pub files_removed: TlsCounter,
160 pub symlinks_removed: TlsCounter,
161 pub directories_removed: TlsCounter,
162 pub bytes_removed: TlsCounter,
163 pub files_skipped: TlsCounter,
164 pub symlinks_skipped: TlsCounter,
165 pub directories_skipped: TlsCounter,
166 pub specials_skipped: TlsCounter,
167 start_time: std::time::Instant,
168}
169
170impl Progress {
171 #[must_use]
172 pub fn new() -> Self {
173 Self {
174 ops: Default::default(),
175 bytes_copied: Default::default(),
176 hard_links_created: Default::default(),
177 files_copied: Default::default(),
178 symlinks_created: Default::default(),
179 directories_created: Default::default(),
180 files_unchanged: Default::default(),
181 symlinks_unchanged: Default::default(),
182 directories_unchanged: Default::default(),
183 hard_links_unchanged: Default::default(),
184 files_removed: Default::default(),
185 symlinks_removed: Default::default(),
186 directories_removed: Default::default(),
187 bytes_removed: Default::default(),
188 files_skipped: Default::default(),
189 symlinks_skipped: Default::default(),
190 directories_skipped: Default::default(),
191 specials_skipped: Default::default(),
192 start_time: std::time::Instant::now(),
193 }
194 }
195
196 pub fn get_duration(&self) -> std::time::Duration {
197 self.start_time.elapsed()
198 }
199}
200
201impl Default for Progress {
202 fn default() -> Self {
203 Self::new()
204 }
205}
206
207#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
208pub struct SerializableProgress {
209 pub ops_started: u64,
210 pub ops_finished: u64,
211 pub bytes_copied: u64,
212 pub hard_links_created: u64,
213 pub files_copied: u64,
214 pub symlinks_created: u64,
215 pub directories_created: u64,
216 pub files_unchanged: u64,
217 pub symlinks_unchanged: u64,
218 pub directories_unchanged: u64,
219 pub hard_links_unchanged: u64,
220 pub files_removed: u64,
221 pub symlinks_removed: u64,
222 pub directories_removed: u64,
223 pub bytes_removed: u64,
224 pub files_skipped: u64,
225 pub symlinks_skipped: u64,
226 pub directories_skipped: u64,
227 pub specials_skipped: u64,
228 pub current_time: std::time::SystemTime,
229}
230
231impl Default for SerializableProgress {
232 fn default() -> Self {
233 Self {
234 ops_started: 0,
235 ops_finished: 0,
236 bytes_copied: 0,
237 hard_links_created: 0,
238 files_copied: 0,
239 symlinks_created: 0,
240 directories_created: 0,
241 files_unchanged: 0,
242 symlinks_unchanged: 0,
243 directories_unchanged: 0,
244 hard_links_unchanged: 0,
245 files_removed: 0,
246 symlinks_removed: 0,
247 directories_removed: 0,
248 bytes_removed: 0,
249 files_skipped: 0,
250 symlinks_skipped: 0,
251 directories_skipped: 0,
252 specials_skipped: 0,
253 current_time: std::time::SystemTime::now(),
254 }
255 }
256}
257
258impl From<&Progress> for SerializableProgress {
259 fn from(progress: &Progress) -> Self {
261 Self {
262 ops_started: progress.ops.started.get(),
263 ops_finished: progress.ops.finished.get(),
264 bytes_copied: progress.bytes_copied.get(),
265 hard_links_created: progress.hard_links_created.get(),
266 files_copied: progress.files_copied.get(),
267 symlinks_created: progress.symlinks_created.get(),
268 directories_created: progress.directories_created.get(),
269 files_unchanged: progress.files_unchanged.get(),
270 symlinks_unchanged: progress.symlinks_unchanged.get(),
271 directories_unchanged: progress.directories_unchanged.get(),
272 hard_links_unchanged: progress.hard_links_unchanged.get(),
273 files_removed: progress.files_removed.get(),
274 symlinks_removed: progress.symlinks_removed.get(),
275 directories_removed: progress.directories_removed.get(),
276 bytes_removed: progress.bytes_removed.get(),
277 files_skipped: progress.files_skipped.get(),
278 symlinks_skipped: progress.symlinks_skipped.get(),
279 directories_skipped: progress.directories_skipped.get(),
280 specials_skipped: progress.specials_skipped.get(),
281 current_time: std::time::SystemTime::now(),
282 }
283 }
284}
285
286pub trait LocalProgressReport: Send {
292 fn print(&mut self) -> anyhow::Result<String>;
293}
294
295#[derive(Copy, Clone, Debug)]
297pub enum LocalProgressKind {
298 Copy,
299 Remove,
300 Link,
301 Compare,
302 Filegen,
303}
304
305fn ops_rates(
306 progress: &Progress,
307 last_ops: u64,
308 last_update: std::time::Instant,
309 now: std::time::Instant,
310) -> (Status, f64, f64) {
311 let ops = progress.ops.get();
312 let total_secs = progress.get_duration().as_secs_f64();
313 let curr_secs = (now - last_update).as_secs_f64();
314 let average = if total_secs > 0.0 {
315 ops.finished as f64 / total_secs
316 } else {
317 0.0
318 };
319 let current = if curr_secs > 0.0 {
320 (ops.finished - last_ops) as f64 / curr_secs
321 } else {
322 0.0
323 };
324 (ops, average, current)
325}
326
327fn bytes_rates(
328 total: u64,
329 last: u64,
330 total_secs: f64,
331 curr_secs: f64,
332) -> (bytesize::ByteSize, bytesize::ByteSize) {
333 let average = if total_secs > 0.0 {
334 total as f64 / total_secs
335 } else {
336 0.0
337 };
338 let current = if curr_secs > 0.0 {
339 (total - last) as f64 / curr_secs
340 } else {
341 0.0
342 };
343 (
344 bytesize::ByteSize(average as u64),
345 bytesize::ByteSize(current as u64),
346 )
347}
348
349pub struct CopyProgressPrinter<'a> {
353 progress: &'a Progress,
354 last_ops: u64,
355 last_bytes: u64,
356 last_update: std::time::Instant,
357}
358
359impl<'a> CopyProgressPrinter<'a> {
360 pub fn new(progress: &'a Progress) -> Self {
361 Self {
362 progress,
363 last_ops: progress.ops.get().finished,
364 last_bytes: progress.bytes_copied.get(),
365 last_update: std::time::Instant::now(),
366 }
367 }
368}
369
370impl<'a> LocalProgressReport for CopyProgressPrinter<'a> {
371 fn print(&mut self) -> anyhow::Result<String> {
372 let now = std::time::Instant::now();
373 let (ops, avg_ops, cur_ops) =
374 ops_rates(self.progress, self.last_ops, self.last_update, now);
375 let bytes = self.progress.bytes_copied.get();
376 let total_secs = self.progress.get_duration().as_secs_f64();
377 let curr_secs = (now - self.last_update).as_secs_f64();
378 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
379 self.last_ops = ops.finished;
380 self.last_bytes = bytes;
381 self.last_update = now;
382 Ok(format!(
383 "---------------------\n\
384 OPS:\n\
385 pending: {:>10}\n\
386 average: {:>10.2} items/s\n\
387 current: {:>10.2} items/s\n\
388 -----------------------\n\
389 COPIED:\n\
390 average: {:>10}/s\n\
391 current: {:>10}/s\n\
392 bytes: {:>10}\n\
393 files: {:>10}\n\
394 symlinks: {:>10}\n\
395 directories: {:>10}\n\
396 -----------------------\n\
397 UNCHANGED:\n\
398 files: {:>10}\n\
399 symlinks: {:>10}\n\
400 directories: {:>10}\n\
401 -----------------------\n\
402 REMOVED:\n\
403 bytes: {:>10}\n\
404 files: {:>10}\n\
405 symlinks: {:>10}\n\
406 directories: {:>10}\n\
407 -----------------------\n\
408 SKIPPED:\n\
409 files: {:>10}\n\
410 symlinks: {:>10}\n\
411 directories: {:>10}\n\
412 specials: {:>10}",
413 ops.started - ops.finished,
414 avg_ops,
415 cur_ops,
416 avg_bytes,
417 cur_bytes,
418 bytesize::ByteSize(bytes),
419 self.progress.files_copied.get(),
420 self.progress.symlinks_created.get(),
421 self.progress.directories_created.get(),
422 self.progress.files_unchanged.get(),
423 self.progress.symlinks_unchanged.get(),
424 self.progress.directories_unchanged.get(),
425 bytesize::ByteSize(self.progress.bytes_removed.get()),
426 self.progress.files_removed.get(),
427 self.progress.symlinks_removed.get(),
428 self.progress.directories_removed.get(),
429 self.progress.files_skipped.get(),
430 self.progress.symlinks_skipped.get(),
431 self.progress.directories_skipped.get(),
432 self.progress.specials_skipped.get(),
433 ))
434 }
435}
436
437pub struct RemoveProgressPrinter<'a> {
440 progress: &'a Progress,
441 last_ops: u64,
442 last_bytes_removed: u64,
443 last_update: std::time::Instant,
444}
445
446impl<'a> RemoveProgressPrinter<'a> {
447 pub fn new(progress: &'a Progress) -> Self {
448 Self {
449 progress,
450 last_ops: progress.ops.get().finished,
451 last_bytes_removed: progress.bytes_removed.get(),
452 last_update: std::time::Instant::now(),
453 }
454 }
455}
456
457impl<'a> LocalProgressReport for RemoveProgressPrinter<'a> {
458 fn print(&mut self) -> anyhow::Result<String> {
459 let now = std::time::Instant::now();
460 let (ops, avg_ops, cur_ops) =
461 ops_rates(self.progress, self.last_ops, self.last_update, now);
462 let bytes_removed = self.progress.bytes_removed.get();
463 let total_secs = self.progress.get_duration().as_secs_f64();
464 let curr_secs = (now - self.last_update).as_secs_f64();
465 let (avg_bytes, cur_bytes) = bytes_rates(
466 bytes_removed,
467 self.last_bytes_removed,
468 total_secs,
469 curr_secs,
470 );
471 self.last_ops = ops.finished;
472 self.last_bytes_removed = bytes_removed;
473 self.last_update = now;
474 Ok(format!(
475 "---------------------\n\
476 OPS:\n\
477 pending: {:>10}\n\
478 average: {:>10.2} items/s\n\
479 current: {:>10.2} items/s\n\
480 -----------------------\n\
481 REMOVED:\n\
482 average: {:>10}/s\n\
483 current: {:>10}/s\n\
484 bytes: {:>10}\n\
485 files: {:>10}\n\
486 symlinks: {:>10}\n\
487 directories: {:>10}\n\
488 -----------------------\n\
489 SKIPPED:\n\
490 files: {:>10}\n\
491 symlinks: {:>10}\n\
492 directories: {:>10}",
493 ops.started - ops.finished,
494 avg_ops,
495 cur_ops,
496 avg_bytes,
497 cur_bytes,
498 bytesize::ByteSize(bytes_removed),
499 self.progress.files_removed.get(),
500 self.progress.symlinks_removed.get(),
501 self.progress.directories_removed.get(),
502 self.progress.files_skipped.get(),
503 self.progress.symlinks_skipped.get(),
504 self.progress.directories_skipped.get(),
505 ))
506 }
507}
508
509pub struct LinkProgressPrinter<'a> {
513 progress: &'a Progress,
514 last_ops: u64,
515 last_bytes: u64,
516 last_update: std::time::Instant,
517}
518
519impl<'a> LinkProgressPrinter<'a> {
520 pub fn new(progress: &'a Progress) -> Self {
521 Self {
522 progress,
523 last_ops: progress.ops.get().finished,
524 last_bytes: progress.bytes_copied.get(),
525 last_update: std::time::Instant::now(),
526 }
527 }
528}
529
530impl<'a> LocalProgressReport for LinkProgressPrinter<'a> {
531 fn print(&mut self) -> anyhow::Result<String> {
532 let now = std::time::Instant::now();
533 let (ops, avg_ops, cur_ops) =
534 ops_rates(self.progress, self.last_ops, self.last_update, now);
535 let bytes = self.progress.bytes_copied.get();
536 let total_secs = self.progress.get_duration().as_secs_f64();
537 let curr_secs = (now - self.last_update).as_secs_f64();
538 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
539 self.last_ops = ops.finished;
540 self.last_bytes = bytes;
541 self.last_update = now;
542 Ok(format!(
543 "---------------------\n\
544 OPS:\n\
545 pending: {:>10}\n\
546 average: {:>10.2} items/s\n\
547 current: {:>10.2} items/s\n\
548 -----------------------\n\
549 LINKED / COPIED:\n\
550 average: {:>10}/s\n\
551 current: {:>10}/s\n\
552 bytes: {:>10}\n\
553 hard-links: {:>10}\n\
554 files: {:>10}\n\
555 symlinks: {:>10}\n\
556 directories: {:>10}\n\
557 -----------------------\n\
558 UNCHANGED:\n\
559 hard-links: {:>10}\n\
560 files: {:>10}\n\
561 symlinks: {:>10}\n\
562 directories: {:>10}\n\
563 -----------------------\n\
564 REMOVED:\n\
565 bytes: {:>10}\n\
566 files: {:>10}\n\
567 symlinks: {:>10}\n\
568 directories: {:>10}\n\
569 -----------------------\n\
570 SKIPPED:\n\
571 files: {:>10}\n\
572 symlinks: {:>10}\n\
573 directories: {:>10}\n\
574 specials: {:>10}",
575 ops.started - ops.finished,
576 avg_ops,
577 cur_ops,
578 avg_bytes,
579 cur_bytes,
580 bytesize::ByteSize(bytes),
581 self.progress.hard_links_created.get(),
582 self.progress.files_copied.get(),
583 self.progress.symlinks_created.get(),
584 self.progress.directories_created.get(),
585 self.progress.hard_links_unchanged.get(),
586 self.progress.files_unchanged.get(),
587 self.progress.symlinks_unchanged.get(),
588 self.progress.directories_unchanged.get(),
589 bytesize::ByteSize(self.progress.bytes_removed.get()),
590 self.progress.files_removed.get(),
591 self.progress.symlinks_removed.get(),
592 self.progress.directories_removed.get(),
593 self.progress.files_skipped.get(),
594 self.progress.symlinks_skipped.get(),
595 self.progress.directories_skipped.get(),
596 self.progress.specials_skipped.get(),
597 ))
598 }
599}
600
601pub struct CompareProgressPrinter<'a> {
604 progress: &'a Progress,
605 last_ops: u64,
606 last_update: std::time::Instant,
607}
608
609impl<'a> CompareProgressPrinter<'a> {
610 pub fn new(progress: &'a Progress) -> Self {
611 Self {
612 progress,
613 last_ops: progress.ops.get().finished,
614 last_update: std::time::Instant::now(),
615 }
616 }
617}
618
619impl<'a> LocalProgressReport for CompareProgressPrinter<'a> {
620 fn print(&mut self) -> anyhow::Result<String> {
621 let now = std::time::Instant::now();
622 let (ops, avg_ops, cur_ops) =
623 ops_rates(self.progress, self.last_ops, self.last_update, now);
624 self.last_ops = ops.finished;
625 self.last_update = now;
626 Ok(format!(
627 "---------------------\n\
628 OPS:\n\
629 pending: {:>10}\n\
630 compared: {:>9}\n\
631 average: {:>10.2} items/s\n\
632 current: {:>10.2} items/s",
633 ops.started - ops.finished,
634 ops.finished,
635 avg_ops,
636 cur_ops,
637 ))
638 }
639}
640
641pub struct FilegenProgressPrinter<'a> {
644 progress: &'a Progress,
645 last_ops: u64,
646 last_bytes: u64,
647 last_update: std::time::Instant,
648}
649
650impl<'a> FilegenProgressPrinter<'a> {
651 pub fn new(progress: &'a Progress) -> Self {
652 Self {
653 progress,
654 last_ops: progress.ops.get().finished,
655 last_bytes: progress.bytes_copied.get(),
656 last_update: std::time::Instant::now(),
657 }
658 }
659}
660
661impl<'a> LocalProgressReport for FilegenProgressPrinter<'a> {
662 fn print(&mut self) -> anyhow::Result<String> {
663 let now = std::time::Instant::now();
664 let (ops, avg_ops, cur_ops) =
665 ops_rates(self.progress, self.last_ops, self.last_update, now);
666 let bytes = self.progress.bytes_copied.get();
667 let total_secs = self.progress.get_duration().as_secs_f64();
668 let curr_secs = (now - self.last_update).as_secs_f64();
669 let (avg_bytes, cur_bytes) = bytes_rates(bytes, self.last_bytes, total_secs, curr_secs);
670 self.last_ops = ops.finished;
671 self.last_bytes = bytes;
672 self.last_update = now;
673 Ok(format!(
674 "---------------------\n\
675 OPS:\n\
676 pending: {:>10}\n\
677 average: {:>10.2} items/s\n\
678 current: {:>10.2} items/s\n\
679 -----------------------\n\
680 GENERATED:\n\
681 average: {:>10}/s\n\
682 current: {:>10}/s\n\
683 bytes: {:>10}\n\
684 files: {:>10}\n\
685 directories: {:>10}",
686 ops.started - ops.finished,
687 avg_ops,
688 cur_ops,
689 avg_bytes,
690 cur_bytes,
691 bytesize::ByteSize(bytes),
692 self.progress.files_copied.get(),
693 self.progress.directories_created.get(),
694 ))
695 }
696}
697
698#[must_use]
701pub fn make_local_printer<'a>(
702 kind: LocalProgressKind,
703 progress: &'a Progress,
704) -> Box<dyn LocalProgressReport + 'a> {
705 match kind {
706 LocalProgressKind::Copy => Box::new(CopyProgressPrinter::new(progress)),
707 LocalProgressKind::Remove => Box::new(RemoveProgressPrinter::new(progress)),
708 LocalProgressKind::Link => Box::new(LinkProgressPrinter::new(progress)),
709 LocalProgressKind::Compare => Box::new(CompareProgressPrinter::new(progress)),
710 LocalProgressKind::Filegen => Box::new(FilegenProgressPrinter::new(progress)),
711 }
712}
713
714pub struct RcpdProgressPrinter {
715 start_time: std::time::Instant,
716 last_source_ops: u64,
717 last_source_bytes: u64,
718 last_source_files: u64,
719 last_dest_ops: u64,
720 last_dest_bytes: u64,
721 last_update: std::time::Instant,
722}
723
724impl RcpdProgressPrinter {
725 #[must_use]
726 pub fn new() -> Self {
727 let now = std::time::Instant::now();
728 Self {
729 start_time: now,
730 last_source_ops: 0,
731 last_source_bytes: 0,
732 last_source_files: 0,
733 last_dest_ops: 0,
734 last_dest_bytes: 0,
735 last_update: now,
736 }
737 }
738
739 fn calculate_current_rate(&self, current: u64, last: u64, duration_secs: f64) -> f64 {
740 if duration_secs > 0.0 {
741 (current - last) as f64 / duration_secs
742 } else {
743 0.0
744 }
745 }
746
747 fn calculate_average_rate(&self, total: u64, total_duration_secs: f64) -> f64 {
748 if total_duration_secs > 0.0 {
749 total as f64 / total_duration_secs
750 } else {
751 0.0
752 }
753 }
754
755 pub fn print(
756 &mut self,
757 source_progress: &SerializableProgress,
758 dest_progress: &SerializableProgress,
759 ) -> anyhow::Result<String> {
760 let time_now = std::time::Instant::now();
761 let total_duration_secs = (time_now - self.start_time).as_secs_f64();
762 let curr_duration_secs = (time_now - self.last_update).as_secs_f64();
763 let source_ops_rate_curr = self.calculate_current_rate(
765 source_progress.ops_finished,
766 self.last_source_ops,
767 curr_duration_secs,
768 );
769 let source_bytes_rate_curr = self.calculate_current_rate(
770 source_progress.bytes_copied,
771 self.last_source_bytes,
772 curr_duration_secs,
773 );
774 let source_files_rate_curr = self.calculate_current_rate(
775 source_progress.files_copied,
776 self.last_source_files,
777 curr_duration_secs,
778 );
779 let source_ops_rate_avg =
781 self.calculate_average_rate(source_progress.ops_finished, total_duration_secs);
782 let source_bytes_rate_avg =
783 self.calculate_average_rate(source_progress.bytes_copied, total_duration_secs);
784 let source_files_rate_avg =
785 self.calculate_average_rate(source_progress.files_copied, total_duration_secs);
786 let dest_ops_rate_curr = self.calculate_current_rate(
788 dest_progress.ops_finished,
789 self.last_dest_ops,
790 curr_duration_secs,
791 );
792 let dest_bytes_rate_curr = self.calculate_current_rate(
793 dest_progress.bytes_copied,
794 self.last_dest_bytes,
795 curr_duration_secs,
796 );
797 let dest_ops_rate_avg =
799 self.calculate_average_rate(dest_progress.ops_finished, total_duration_secs);
800 let dest_bytes_rate_avg =
801 self.calculate_average_rate(dest_progress.bytes_copied, total_duration_secs);
802 self.last_source_ops = source_progress.ops_finished;
804 self.last_source_bytes = source_progress.bytes_copied;
805 self.last_source_files = source_progress.files_copied;
806 self.last_dest_ops = dest_progress.ops_finished;
807 self.last_dest_bytes = dest_progress.bytes_copied;
808 self.last_update = time_now;
809 Ok(format!(
810 "==== SOURCE =======\n\
811 OPS:\n\
812 pending: {:>10}\n\
813 average: {:>10.2} items/s\n\
814 current: {:>10.2} items/s\n\
815 ---------------------\n\
816 COPIED:\n\
817 average: {:>10}/s\n\
818 current: {:>10}/s\n\
819 bytes: {:>10}\n\
820 files: {:>10}\n\
821 ---------------------\n\
822 FILES:\n\
823 average: {:>10.2} files/s\n\
824 current: {:>10.2} files/s\n\
825 ---------------------\n\
826 SKIPPED:\n\
827 files: {:>10}\n\
828 symlinks: {:>10}\n\
829 directories: {:>10}\n\
830 specials: {:>10}\n\
831 ==== DESTINATION ====\n\
832 OPS:\n\
833 pending: {:>10}\n\
834 average: {:>10.2} items/s\n\
835 current: {:>10.2} items/s\n\
836 ---------------------\n\
837 COPIED:\n\
838 average: {:>10}/s\n\
839 current: {:>10}/s\n\
840 bytes: {:>10}\n\
841 files: {:>10}\n\
842 symlinks: {:>10}\n\
843 directories: {:>10}\n\
844 hard-links: {:>10}\n\
845 ---------------------\n\
846 UNCHANGED:\n\
847 files: {:>10}\n\
848 symlinks: {:>10}\n\
849 directories: {:>10}\n\
850 hard-links: {:>10}\n\
851 ---------------------\n\
852 REMOVED:\n\
853 bytes: {:>10}\n\
854 files: {:>10}\n\
855 symlinks: {:>10}\n\
856 directories: {:>10}",
857 source_progress.ops_started - source_progress.ops_finished, source_ops_rate_avg,
860 source_ops_rate_curr,
861 bytesize::ByteSize(source_bytes_rate_avg as u64),
862 bytesize::ByteSize(source_bytes_rate_curr as u64),
863 bytesize::ByteSize(source_progress.bytes_copied),
864 source_progress.files_copied,
865 source_files_rate_avg,
866 source_files_rate_curr,
867 source_progress.files_skipped,
869 source_progress.symlinks_skipped,
870 source_progress.directories_skipped,
871 source_progress.specials_skipped,
872 dest_progress.ops_started - dest_progress.ops_finished, dest_ops_rate_avg,
875 dest_ops_rate_curr,
876 bytesize::ByteSize(dest_bytes_rate_avg as u64),
877 bytesize::ByteSize(dest_bytes_rate_curr as u64),
878 bytesize::ByteSize(dest_progress.bytes_copied),
879 dest_progress.files_copied,
881 dest_progress.symlinks_created,
882 dest_progress.directories_created,
883 dest_progress.hard_links_created,
884 dest_progress.files_unchanged,
886 dest_progress.symlinks_unchanged,
887 dest_progress.directories_unchanged,
888 dest_progress.hard_links_unchanged,
889 bytesize::ByteSize(dest_progress.bytes_removed),
891 dest_progress.files_removed,
892 dest_progress.symlinks_removed,
893 dest_progress.directories_removed,
894 ))
895 }
896}
897
898impl Default for RcpdProgressPrinter {
899 fn default() -> Self {
900 Self::new()
901 }
902}
903
904#[cfg(test)]
905mod tests {
906 use super::*;
907 use crate::remote_tracing::TracingMessage;
908 use anyhow::Result;
909
910 #[test]
911 fn basic_counting() -> Result<()> {
912 let tls_counter = TlsCounter::new();
913 for _ in 0..10 {
914 tls_counter.inc();
915 }
916 assert!(tls_counter.get() == 10);
917 Ok(())
918 }
919
920 #[test]
921 fn threaded_counting() -> Result<()> {
922 let tls_counter = TlsCounter::new();
923 std::thread::scope(|scope| {
924 let mut handles = Vec::new();
925 for _ in 0..10 {
926 handles.push(scope.spawn(|| {
927 for _ in 0..100 {
928 tls_counter.inc();
929 }
930 }));
931 }
932 });
933 assert!(tls_counter.get() == 1000);
934 Ok(())
935 }
936
937 #[test]
938 fn basic_guard() -> Result<()> {
939 let tls_progress = ProgressCounter::new();
940 let _guard = tls_progress.guard();
941 Ok(())
942 }
943
944 #[test]
945 fn test_serializable_progress() -> Result<()> {
946 let progress = Progress::new();
947
948 progress.files_copied.inc();
950 progress.bytes_copied.add(1024);
951 progress.directories_created.add(2);
952
953 let serializable = SerializableProgress::from(&progress);
955 assert_eq!(serializable.files_copied, 1);
956 assert_eq!(serializable.bytes_copied, 1024);
957 assert_eq!(serializable.directories_created, 2);
958
959 let _tracing_msg = TracingMessage::Progress(serializable);
961
962 Ok(())
963 }
964
965 #[test]
966 fn test_rcpd_progress_printer() -> Result<()> {
967 let mut printer = RcpdProgressPrinter::new();
968
969 let source_progress = SerializableProgress {
971 ops_started: 100,
972 ops_finished: 80,
973 bytes_copied: 1024,
974 files_copied: 5,
975 files_skipped: 3,
976 symlinks_skipped: 1,
977 directories_skipped: 2,
978 ..Default::default()
979 };
980
981 let dest_progress = SerializableProgress {
982 ops_started: 80,
983 ops_finished: 70,
984 bytes_copied: 1024,
985 files_copied: 8,
986 symlinks_created: 2,
987 directories_created: 1,
988 ..Default::default()
989 };
990
991 let output = printer.print(&source_progress, &dest_progress)?;
993 assert!(output.contains("SOURCE"));
994 assert!(output.contains("DESTINATION"));
995 assert!(output.contains("OPS:"));
996 assert!(output.contains("pending:"));
997 assert!(output.contains("20")); assert!(output.contains("10")); let mut sections = output.split("==== DESTINATION ====");
1000 let source_section = sections.next().unwrap();
1001 let dest_section = sections.next().unwrap_or("");
1002 let source_files_line = source_section
1003 .lines()
1004 .find(|line| line.trim_start().starts_with("files:"))
1005 .expect("source files line missing");
1006 assert!(source_files_line.trim_start().ends_with("5"));
1007 assert!(!source_files_line.contains('.'));
1008 let dest_files_line = dest_section
1009 .lines()
1010 .find(|line| line.trim_start().starts_with("files:"))
1011 .expect("dest files line missing");
1012 assert!(dest_files_line.trim_start().ends_with("8"));
1013 assert!(!dest_files_line.contains('.'));
1014 assert!(source_section.contains("SKIPPED:"));
1016 let skipped_section = source_section
1017 .split("SKIPPED:")
1018 .nth(1)
1019 .expect("SKIPPED section missing in source");
1020 let skipped_lines: Vec<&str> = skipped_section.lines().collect();
1021 let skipped_files_line = skipped_lines
1022 .iter()
1023 .find(|line| line.trim_start().starts_with("files:"))
1024 .expect("skipped files line missing");
1025 assert!(skipped_files_line.trim_start().ends_with("3"));
1026 let skipped_symlinks_line = skipped_lines
1027 .iter()
1028 .find(|line| line.trim_start().starts_with("symlinks:"))
1029 .expect("skipped symlinks line missing");
1030 assert!(skipped_symlinks_line.trim_start().ends_with("1"));
1031 let skipped_dirs_line = skipped_lines
1032 .iter()
1033 .find(|line| line.trim_start().starts_with("directories:"))
1034 .expect("skipped directories line missing");
1035 assert!(skipped_dirs_line.trim_start().ends_with("2"));
1036
1037 Ok(())
1038 }
1039
1040 #[test]
1041 fn interleaved_counter_access() -> Result<()> {
1042 let counter_a = TlsCounter::new();
1045 let counter_b = TlsCounter::new();
1046 let counter_c = TlsCounter::new();
1047 for i in 0..100 {
1048 counter_a.add(1);
1049 counter_b.add(2);
1050 counter_c.add(3);
1051 if i % 10 == 0 {
1053 assert_eq!(counter_a.get(), i + 1);
1054 assert_eq!(counter_b.get(), (i + 1) * 2);
1055 assert_eq!(counter_c.get(), (i + 1) * 3);
1056 }
1057 }
1058 assert_eq!(counter_a.get(), 100);
1060 assert_eq!(counter_b.get(), 200);
1061 assert_eq!(counter_c.get(), 300);
1062 Ok(())
1063 }
1064
1065 #[test]
1066 fn concurrent_multi_counter_access() -> Result<()> {
1067 let counter_a = std::sync::Arc::new(TlsCounter::new());
1069 let counter_b = std::sync::Arc::new(TlsCounter::new());
1070 const THREADS: usize = 4;
1071 const ITERATIONS: u64 = 1000;
1072 let handles: Vec<_> = (0..THREADS)
1073 .map(|_| {
1074 let ca = counter_a.clone();
1075 let cb = counter_b.clone();
1076 std::thread::spawn(move || {
1077 for _ in 0..ITERATIONS {
1078 ca.add(1);
1079 cb.add(2);
1080 }
1081 })
1082 })
1083 .collect();
1084 for h in handles {
1085 h.join().unwrap();
1086 }
1087 assert_eq!(counter_a.get(), THREADS as u64 * ITERATIONS);
1089 assert_eq!(counter_b.get(), THREADS as u64 * ITERATIONS * 2);
1090 Ok(())
1091 }
1092
1093 #[test]
1094 fn repeated_counter_access() -> Result<()> {
1095 let counter = TlsCounter::new();
1097 for i in 1..=1000 {
1098 counter.add(1);
1099 assert_eq!(counter.get(), i);
1100 }
1101 Ok(())
1102 }
1103
1104 #[test]
1105 fn sharding_distributes_across_threads() -> Result<()> {
1106 let counter = std::sync::Arc::new(TlsCounter::new());
1109 const THREADS: usize = 16;
1110 const ITERATIONS: u64 = 100;
1111 let handles: Vec<_> = (0..THREADS)
1112 .map(|_| {
1113 let c = counter.clone();
1114 std::thread::spawn(move || {
1115 for _ in 0..ITERATIONS {
1116 c.inc();
1117 }
1118 })
1119 })
1120 .collect();
1121 for h in handles {
1122 h.join().unwrap();
1123 }
1124 assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1125 Ok(())
1126 }
1127
1128 #[test]
1129 fn sharding_handles_more_threads_than_shards() -> Result<()> {
1130 let counter = std::sync::Arc::new(TlsCounter::new());
1132 const THREADS: usize = 128; const ITERATIONS: u64 = 100;
1134 let handles: Vec<_> = (0..THREADS)
1135 .map(|_| {
1136 let c = counter.clone();
1137 std::thread::spawn(move || {
1138 for _ in 0..ITERATIONS {
1139 c.inc();
1140 }
1141 })
1142 })
1143 .collect();
1144 for h in handles {
1145 h.join().unwrap();
1146 }
1147 assert_eq!(counter.get(), THREADS as u64 * ITERATIONS);
1148 Ok(())
1149 }
1150
1151 #[test]
1152 fn counter_independence() -> Result<()> {
1153 let counters: Vec<_> = (0..10).map(|_| TlsCounter::new()).collect();
1155 for (i, counter) in counters.iter().enumerate() {
1156 counter.add((i + 1) as u64 * 100);
1157 }
1158 for (i, counter) in counters.iter().enumerate() {
1159 assert_eq!(counter.get(), (i + 1) as u64 * 100);
1160 }
1161 Ok(())
1162 }
1163
1164 #[test]
1165 fn copy_printer_omits_hard_links() -> Result<()> {
1166 let progress = Progress::new();
1167 let mut printer = CopyProgressPrinter::new(&progress);
1168 let output = printer.print()?;
1169 assert!(output.contains("COPIED:"));
1170 assert!(output.contains("UNCHANGED:"));
1171 assert!(output.contains("REMOVED:"));
1172 assert!(output.contains("SKIPPED:"));
1173 assert!(!output.contains("hard-links"));
1174 Ok(())
1175 }
1176
1177 #[test]
1178 fn remove_printer_hides_copy_and_unchanged() -> Result<()> {
1179 let progress = Progress::new();
1180 let mut printer = RemoveProgressPrinter::new(&progress);
1181 let output = printer.print()?;
1182 assert!(output.contains("REMOVED:"));
1183 assert!(output.contains("SKIPPED:"));
1184 assert!(!output.contains("COPIED:"));
1185 assert!(!output.contains("UNCHANGED:"));
1186 assert!(!output.contains("hard-links"));
1187 assert!(!output.contains("specials:"));
1188 Ok(())
1189 }
1190
1191 #[test]
1192 fn link_printer_shows_hard_links() -> Result<()> {
1193 let progress = Progress::new();
1194 let mut printer = LinkProgressPrinter::new(&progress);
1195 let output = printer.print()?;
1196 assert!(output.contains("LINKED / COPIED:"));
1197 assert!(output.contains("UNCHANGED:"));
1198 assert!(output.contains("REMOVED:"));
1199 assert!(output.contains("SKIPPED:"));
1200 assert!(output.contains("hard-links"));
1201 Ok(())
1202 }
1203
1204 #[test]
1205 fn compare_printer_only_shows_ops() -> Result<()> {
1206 let progress = Progress::new();
1207 let mut printer = CompareProgressPrinter::new(&progress);
1208 let output = printer.print()?;
1209 assert!(output.contains("OPS:"));
1210 assert!(!output.contains("COPIED:"));
1211 assert!(!output.contains("UNCHANGED:"));
1212 assert!(!output.contains("REMOVED:"));
1213 assert!(!output.contains("SKIPPED:"));
1214 assert!(!output.contains("GENERATED:"));
1215 Ok(())
1216 }
1217
1218 #[test]
1219 fn filegen_printer_shows_generated() -> Result<()> {
1220 let progress = Progress::new();
1221 let mut printer = FilegenProgressPrinter::new(&progress);
1222 let output = printer.print()?;
1223 assert!(output.contains("OPS:"));
1224 assert!(output.contains("GENERATED:"));
1225 assert!(!output.contains("COPIED:"));
1226 assert!(!output.contains("UNCHANGED:"));
1227 assert!(!output.contains("REMOVED:"));
1228 assert!(!output.contains("SKIPPED:"));
1229 assert!(!output.contains("symlinks:"));
1230 Ok(())
1231 }
1232}