Skip to main content

parallel_disk_usage/reporter/
progress_and_error_reporter.rs

1use super::{ErrorReport, Event, ParallelReporter, ProgressReport, Reporter};
2use crate::size;
3use progress_report_state::ProgressReportState;
4use std::{
5    any::Any,
6    marker::PhantomData,
7    ops::ControlFlow,
8    sync::{Arc, atomic::Ordering::Relaxed},
9    thread::{JoinHandle, sleep, spawn},
10    time::Duration,
11};
12
13/// Store progress information and call report function on said information.
14///
15/// **NOTE:** If an error occurred, `report_error` would be called before `report_progress`.
16#[derive(Debug)]
17pub struct ProgressAndErrorReporter<Size, ReportError>
18where
19    Size: size::Size + Send + Sync,
20    ReportError: Fn(ErrorReport) + Sync,
21    u64: Into<Size>,
22{
23    /// Progress information.
24    progress: Arc<ProgressReportState>,
25    /// Report encountered error.
26    report_error: ReportError,
27    /// Join handle of progress reporting thread.
28    progress_reporter_handle: JoinHandle<()>,
29    /// Keep generic parameters.
30    _phantom: PhantomData<Size>,
31}
32
33impl<Size, ReportError> ProgressAndErrorReporter<Size, ReportError>
34where
35    Size: size::Size + Send + Sync,
36    ReportError: Fn(ErrorReport) + Sync,
37    u64: Into<Size>,
38{
39    /// Create a new [`ProgressAndErrorReporter`] from a report function.
40    pub fn new<ReportProgress>(
41        report_progress: ReportProgress,
42        progress_report_interval: Duration,
43        report_error: ReportError,
44    ) -> Self
45    where
46        ProgressReport<Size>: Default + 'static,
47        ReportProgress: Fn(ProgressReport<Size>) + Send + Sync + 'static,
48    {
49        let progress = Arc::new(ProgressReportState::default());
50        let progress_thread = progress.clone();
51        let progress_reporter_handle = spawn(move || {
52            loop {
53                sleep(progress_report_interval);
54                match progress_thread.to_progress_report() {
55                    ControlFlow::Continue(progress) => report_progress(progress),
56                    ControlFlow::Break(()) => break,
57                };
58            }
59        });
60        ProgressAndErrorReporter {
61            progress,
62            report_error,
63            progress_reporter_handle,
64            _phantom: PhantomData,
65        }
66    }
67
68    /// Stop the thread that reports progress.
69    ///
70    /// This function would be automatically invoked once the value is [dropped](Drop).
71    pub fn stop_progress_reporter(&self) {
72        self.progress.stopped.store(true, Relaxed);
73    }
74}
75
76impl<Size, ReportError> Reporter<Size> for ProgressAndErrorReporter<Size, ReportError>
77where
78    Size: size::Size + Into<u64> + Send + Sync,
79    ReportError: Fn(ErrorReport) + Sync,
80    u64: Into<Size>,
81{
82    fn report(&self, event: Event<Size>) {
83        use Event::*;
84        let ProgressAndErrorReporter {
85            progress,
86            report_error,
87            ..
88        } = self;
89        macro_rules! bump {
90            ($field:ident += $delta:expr) => {
91                progress.$field.fetch_add($delta, Relaxed)
92            };
93        }
94        match event {
95            ReceiveData(size) => {
96                bump!(items += 1);
97                bump!(total += size.into());
98            }
99            EncounterError(error_report) => {
100                report_error(error_report);
101                bump!(errors += 1);
102            }
103            DetectHardlink(info) => {
104                bump!(linked += info.links);
105                bump!(shared += info.size.into());
106            }
107        }
108    }
109}
110
111impl<Size, ReportError> ParallelReporter<Size> for ProgressAndErrorReporter<Size, ReportError>
112where
113    Size: size::Size + Into<u64> + Send + Sync,
114    ReportError: Fn(ErrorReport) + Sync,
115    u64: Into<Size>,
116{
117    type DestructionError = Box<dyn Any + Send + 'static>;
118    fn destroy(self) -> Result<(), Self::DestructionError> {
119        self.stop_progress_reporter();
120        self.progress_reporter_handle.join()
121    }
122}
123
124mod progress_report_state;