parallel_disk_usage/reporter/
progress_and_error_reporter.rs1use super::{ErrorReport, Event, ParallelReporter, ProgressReport, Reporter};
2use crate::size;
3use progress_report_state::ProgressReportState;
4use std::{
5 any::Any,
6 marker::PhantomData,
7 ops::ControlFlow,
8 sync::{Arc, atomic::Ordering::Relaxed},
9 thread::{JoinHandle, sleep, spawn},
10 time::Duration,
11};
12
13#[derive(Debug)]
17pub struct ProgressAndErrorReporter<Size, ReportError>
18where
19 Size: size::Size + Send + Sync,
20 ReportError: Fn(ErrorReport) + Sync,
21 u64: Into<Size>,
22{
23 progress: Arc<ProgressReportState>,
25 report_error: ReportError,
27 progress_reporter_handle: JoinHandle<()>,
29 _phantom: PhantomData<Size>,
31}
32
33impl<Size, ReportError> ProgressAndErrorReporter<Size, ReportError>
34where
35 Size: size::Size + Send + Sync,
36 ReportError: Fn(ErrorReport) + Sync,
37 u64: Into<Size>,
38{
39 pub fn new<ReportProgress>(
41 report_progress: ReportProgress,
42 progress_report_interval: Duration,
43 report_error: ReportError,
44 ) -> Self
45 where
46 ProgressReport<Size>: Default + 'static,
47 ReportProgress: Fn(ProgressReport<Size>) + Send + Sync + 'static,
48 {
49 let progress = Arc::new(ProgressReportState::default());
50 let progress_thread = progress.clone();
51 let progress_reporter_handle = spawn(move || {
52 loop {
53 sleep(progress_report_interval);
54 match progress_thread.to_progress_report() {
55 ControlFlow::Continue(progress) => report_progress(progress),
56 ControlFlow::Break(()) => break,
57 };
58 }
59 });
60 ProgressAndErrorReporter {
61 progress,
62 report_error,
63 progress_reporter_handle,
64 _phantom: PhantomData,
65 }
66 }
67
68 pub fn stop_progress_reporter(&self) {
72 self.progress.stopped.store(true, Relaxed);
73 }
74}
75
76impl<Size, ReportError> Reporter<Size> for ProgressAndErrorReporter<Size, ReportError>
77where
78 Size: size::Size + Into<u64> + Send + Sync,
79 ReportError: Fn(ErrorReport) + Sync,
80 u64: Into<Size>,
81{
82 fn report(&self, event: Event<Size>) {
83 use Event::*;
84 let ProgressAndErrorReporter {
85 progress,
86 report_error,
87 ..
88 } = self;
89 macro_rules! bump {
90 ($field:ident += $delta:expr) => {
91 progress.$field.fetch_add($delta, Relaxed)
92 };
93 }
94 match event {
95 ReceiveData(size) => {
96 bump!(items += 1);
97 bump!(total += size.into());
98 }
99 EncounterError(error_report) => {
100 report_error(error_report);
101 bump!(errors += 1);
102 }
103 DetectHardlink(info) => {
104 bump!(linked += info.links);
105 bump!(shared += info.size.into());
106 }
107 }
108 }
109}
110
111impl<Size, ReportError> ParallelReporter<Size> for ProgressAndErrorReporter<Size, ReportError>
112where
113 Size: size::Size + Into<u64> + Send + Sync,
114 ReportError: Fn(ErrorReport) + Sync,
115 u64: Into<Size>,
116{
117 type DestructionError = Box<dyn Any + Send + 'static>;
118 fn destroy(self) -> Result<(), Self::DestructionError> {
119 self.stop_progress_reporter();
120 self.progress_reporter_handle.join()
121 }
122}
123
124mod progress_report_state;