1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use super::{ErrorReport, Event, ParallelReporter, ProgressReport, Reporter, Size};
use progress_report_state::ProgressReportState;
use std::{
any::Any,
marker::PhantomData,
ops::ControlFlow,
sync::{atomic::Ordering::Relaxed, Arc},
thread::{sleep, spawn, JoinHandle},
time::Duration,
};
#[derive(Debug)]
pub struct ProgressAndErrorReporter<Data, ReportError>
where
Data: Size + Send + Sync,
ReportError: Fn(ErrorReport) + Sync,
u64: Into<Data>,
{
progress: Arc<ProgressReportState>,
report_error: ReportError,
progress_reporter_handle: JoinHandle<()>,
_phantom: PhantomData<Data>,
}
impl<Data, ReportError> ProgressAndErrorReporter<Data, ReportError>
where
Data: Size + Send + Sync,
ReportError: Fn(ErrorReport) + Sync,
u64: Into<Data>,
{
pub fn new<ReportProgress>(
report_progress: ReportProgress,
progress_report_interval: Duration,
report_error: ReportError,
) -> Self
where
ProgressReport<Data>: Default + 'static,
ReportProgress: Fn(ProgressReport<Data>) + Send + Sync + 'static,
{
let progress = Arc::new(ProgressReportState::default());
let progress_thread = progress.clone();
let progress_reporter_handle = spawn(move || loop {
sleep(progress_report_interval);
match progress_thread.to_progress_report() {
ControlFlow::Continue(progress) => report_progress(progress),
ControlFlow::Break(()) => break,
};
});
ProgressAndErrorReporter {
progress,
report_error,
progress_reporter_handle,
_phantom: PhantomData,
}
}
pub fn stop_progress_reporter(&self) {
self.progress.stopped.store(true, Relaxed);
}
}
impl<Data, ReportError> Reporter<Data> for ProgressAndErrorReporter<Data, ReportError>
where
Data: Size + Into<u64> + Send + Sync,
ReportError: Fn(ErrorReport) + Sync,
u64: Into<Data>,
{
fn report(&self, event: Event<Data>) {
use Event::*;
let ProgressAndErrorReporter {
progress,
report_error,
..
} = self;
macro_rules! bump {
($field:ident += $delta:expr) => {
progress.$field.fetch_add($delta, Relaxed)
};
}
match event {
ReceiveData(data) => {
bump!(items += 1);
bump!(total += data.into());
}
EncounterError(error_report) => {
report_error(error_report);
bump!(errors += 1);
}
}
}
}
impl<Data, ReportError> ParallelReporter<Data> for ProgressAndErrorReporter<Data, ReportError>
where
Data: Size + Into<u64> + Send + Sync,
ReportError: Fn(ErrorReport) + Sync,
u64: Into<Data>,
{
type DestructionError = Box<dyn Any + Send + 'static>;
fn destroy(self) -> Result<(), Self::DestructionError> {
self.stop_progress_reporter();
self.progress_reporter_handle.join()
}
}
mod progress_report_state;