use std::{
sync::mpsc::{
self,
Receiver,
RecvTimeoutError,
},
thread,
time::Duration,
};
use crate::{
Progress,
model::ProgressCounters,
};
use super::{
running_progress_guard::RunningProgressGuard,
running_progress_notifier::RunningProgressNotifier,
running_progress_signal::RunningProgressSignal,
};
pub(crate) struct RunningProgressLoop {
signal_receiver: Receiver<RunningProgressSignal>,
}
enum RunningProgressWait {
Signal(RunningProgressSignal),
Timeout,
Disconnected,
}
impl RunningProgressWait {
#[inline]
fn should_report(self) -> bool {
match self {
Self::Timeout => true,
Self::Disconnected => false,
Self::Signal(signal) => match signal {
RunningProgressSignal::RunningPoint => true,
RunningProgressSignal::Stop => false,
},
}
}
}
impl RunningProgressLoop {
pub(crate) fn spawn_scoped<'scope, 'env, 'progress, F>(
scope: &'scope thread::Scope<'scope, 'env>,
progress: Progress<'progress>,
snapshot: F,
) -> RunningProgressGuard<'scope>
where
'progress: 'scope,
F: FnMut() -> ProgressCounters + Send + 'scope,
{
let report_points = progress.report_interval().is_zero();
let (progress_loop, notifier) = Self::channel();
let progress_thread = scope.spawn(move || {
progress_loop.run(progress, snapshot);
});
RunningProgressGuard::new(notifier, progress_thread, report_points)
}
pub(crate) fn channel() -> (Self, RunningProgressNotifier) {
let (signal_sender, signal_receiver) = mpsc::channel();
(
Self { signal_receiver },
RunningProgressNotifier { signal_sender },
)
}
pub(crate) fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
where
F: FnMut() -> ProgressCounters,
{
let report_interval = progress.report_interval();
while self.receive_wait(report_interval).should_report() {
progress.report_running_if_due(snapshot());
}
}
fn receive_wait(&self, report_interval: Duration) -> RunningProgressWait {
if report_interval.is_zero() {
return match self.signal_receiver.recv() {
Ok(signal) => RunningProgressWait::Signal(signal),
Err(_) => RunningProgressWait::Disconnected,
};
}
match self.signal_receiver.recv_timeout(report_interval) {
Ok(signal) => RunningProgressWait::Signal(signal),
Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
}
}
}