parallel_disk_usage/reporter/
progress_and_error_reporter.rs1use super::{ErrorReport, Event, ParallelReporter, ProgressReport, Reporter};
2use crate::size;
3use progress_report_state::ProgressReportState;
4use std::{
5 any::Any,
6 marker::PhantomData,
7 ops::ControlFlow,
8 sync::{atomic::Ordering::Relaxed, Arc},
9 thread::{sleep, spawn, JoinHandle},
10 time::Duration,
11};
12
13#[derive(Debug)]
17pub struct ProgressAndErrorReporter<Size, ReportError>
18where
19 Size: size::Size + Send + Sync,
20 ReportError: Fn(ErrorReport) + Sync,
21 u64: Into<Size>,
22{
23 progress: Arc<ProgressReportState>,
25 report_error: ReportError,
27 progress_reporter_handle: JoinHandle<()>,
29 _phantom: PhantomData<Size>,
31}
32
33impl<Size, ReportError> ProgressAndErrorReporter<Size, ReportError>
34where
35 Size: size::Size + Send + Sync,
36 ReportError: Fn(ErrorReport) + Sync,
37 u64: Into<Size>,
38{
39 pub fn new<ReportProgress>(
41 report_progress: ReportProgress,
42 progress_report_interval: Duration,
43 report_error: ReportError,
44 ) -> Self
45 where
46 ProgressReport<Size>: Default + 'static,
47 ReportProgress: Fn(ProgressReport<Size>) + Send + Sync + 'static,
48 {
49 let progress = Arc::new(ProgressReportState::default());
50 let progress_thread = progress.clone();
51 let progress_reporter_handle = spawn(move || loop {
52 sleep(progress_report_interval);
53 match progress_thread.to_progress_report() {
54 ControlFlow::Continue(progress) => report_progress(progress),
55 ControlFlow::Break(()) => break,
56 };
57 });
58 ProgressAndErrorReporter {
59 progress,
60 report_error,
61 progress_reporter_handle,
62 _phantom: PhantomData,
63 }
64 }
65
66 pub fn stop_progress_reporter(&self) {
70 self.progress.stopped.store(true, Relaxed);
71 }
72}
73
74impl<Size, ReportError> Reporter<Size> for ProgressAndErrorReporter<Size, ReportError>
75where
76 Size: size::Size + Into<u64> + Send + Sync,
77 ReportError: Fn(ErrorReport) + Sync,
78 u64: Into<Size>,
79{
80 fn report(&self, event: Event<Size>) {
81 use Event::*;
82 let ProgressAndErrorReporter {
83 progress,
84 report_error,
85 ..
86 } = self;
87 macro_rules! bump {
88 ($field:ident += $delta:expr) => {
89 progress.$field.fetch_add($delta, Relaxed)
90 };
91 }
92 match event {
93 ReceiveData(size) => {
94 bump!(items += 1);
95 bump!(total += size.into());
96 }
97 EncounterError(error_report) => {
98 report_error(error_report);
99 bump!(errors += 1);
100 }
101 }
102 }
103}
104
105impl<Size, ReportError> ParallelReporter<Size> for ProgressAndErrorReporter<Size, ReportError>
106where
107 Size: size::Size + Into<u64> + Send + Sync,
108 ReportError: Fn(ErrorReport) + Sync,
109 u64: Into<Size>,
110{
111 type DestructionError = Box<dyn Any + Send + 'static>;
112 fn destroy(self) -> Result<(), Self::DestructionError> {
113 self.stop_progress_reporter();
114 self.progress_reporter_handle.join()
115 }
116}
117
118mod progress_report_state;