parallel_disk_usage/reporter/
progress_and_error_reporter.rs

1use super::{ErrorReport, Event, ParallelReporter, ProgressReport, Reporter};
2use crate::size;
3use progress_report_state::ProgressReportState;
4use std::{
5    any::Any,
6    marker::PhantomData,
7    ops::ControlFlow,
8    sync::{atomic::Ordering::Relaxed, Arc},
9    thread::{sleep, spawn, JoinHandle},
10    time::Duration,
11};
12
13/// Store progress information and call report function on said information.
14///
15/// **NOTE:** If an error occurred, `report_error` would be called before `report_progress`.
16#[derive(Debug)]
17pub struct ProgressAndErrorReporter<Size, ReportError>
18where
19    Size: size::Size + Send + Sync,
20    ReportError: Fn(ErrorReport) + Sync,
21    u64: Into<Size>,
22{
23    /// Progress information.
24    progress: Arc<ProgressReportState>,
25    /// Report encountered error.
26    report_error: ReportError,
27    /// Join handle of progress reporting thread.
28    progress_reporter_handle: JoinHandle<()>,
29    /// Keep generic parameters.
30    _phantom: PhantomData<Size>,
31}
32
33impl<Size, ReportError> ProgressAndErrorReporter<Size, ReportError>
34where
35    Size: size::Size + Send + Sync,
36    ReportError: Fn(ErrorReport) + Sync,
37    u64: Into<Size>,
38{
39    /// Create a new [`ProgressAndErrorReporter`] from a report function.
40    pub fn new<ReportProgress>(
41        report_progress: ReportProgress,
42        progress_report_interval: Duration,
43        report_error: ReportError,
44    ) -> Self
45    where
46        ProgressReport<Size>: Default + 'static,
47        ReportProgress: Fn(ProgressReport<Size>) + Send + Sync + 'static,
48    {
49        let progress = Arc::new(ProgressReportState::default());
50        let progress_thread = progress.clone();
51        let progress_reporter_handle = spawn(move || loop {
52            sleep(progress_report_interval);
53            match progress_thread.to_progress_report() {
54                ControlFlow::Continue(progress) => report_progress(progress),
55                ControlFlow::Break(()) => break,
56            };
57        });
58        ProgressAndErrorReporter {
59            progress,
60            report_error,
61            progress_reporter_handle,
62            _phantom: PhantomData,
63        }
64    }
65
66    /// Stop the thread that reports progress.
67    ///
68    /// This function would be automatically invoked once the value is [dropped](Drop).
69    pub fn stop_progress_reporter(&self) {
70        self.progress.stopped.store(true, Relaxed);
71    }
72}
73
74impl<Size, ReportError> Reporter<Size> for ProgressAndErrorReporter<Size, ReportError>
75where
76    Size: size::Size + Into<u64> + Send + Sync,
77    ReportError: Fn(ErrorReport) + Sync,
78    u64: Into<Size>,
79{
80    fn report(&self, event: Event<Size>) {
81        use Event::*;
82        let ProgressAndErrorReporter {
83            progress,
84            report_error,
85            ..
86        } = self;
87        macro_rules! bump {
88            ($field:ident += $delta:expr) => {
89                progress.$field.fetch_add($delta, Relaxed)
90            };
91        }
92        match event {
93            ReceiveData(size) => {
94                bump!(items += 1);
95                bump!(total += size.into());
96            }
97            EncounterError(error_report) => {
98                report_error(error_report);
99                bump!(errors += 1);
100            }
101        }
102    }
103}
104
105impl<Size, ReportError> ParallelReporter<Size> for ProgressAndErrorReporter<Size, ReportError>
106where
107    Size: size::Size + Into<u64> + Send + Sync,
108    ReportError: Fn(ErrorReport) + Sync,
109    u64: Into<Size>,
110{
111    type DestructionError = Box<dyn Any + Send + 'static>;
112    fn destroy(self) -> Result<(), Self::DestructionError> {
113        self.stop_progress_reporter();
114        self.progress_reporter_handle.join()
115    }
116}
117
118mod progress_report_state;