qubit_progress/running/running_progress_loop.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::mpsc::{
12 self,
13 Receiver,
14 RecvTimeoutError,
15 },
16 time::Duration,
17};
18
19use crate::{
20 Progress,
21 model::ProgressCounters,
22};
23
24use super::{
25 running_progress_notifier::RunningProgressNotifier,
26 running_progress_signal::RunningProgressSignal,
27};
28
29/// Runs periodic `running` progress reports for work tracked elsewhere.
30///
31/// `RunningProgressLoop` is useful when worker threads update shared state and
32/// a separate reporter thread should periodically emit `running` events. The
33/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
34/// and a snapshot closure that converts their domain state into
35/// [`ProgressCounters`].
36///
37/// Use [`Self::channel`] to create a loop and its matching
38/// [`RunningProgressNotifier`]. Move the loop into a reporter thread, clone the
39/// notifier into workers when zero-interval wakeups are needed, and send
40/// [`RunningProgressNotifier::stop`] when the operation is complete.
41///
42/// # Examples
43///
44/// ```
45/// use std::{
46/// sync::{
47/// Arc,
48/// atomic::{
49/// AtomicUsize,
50/// Ordering,
51/// },
52/// },
53/// thread,
54/// time::Duration,
55/// };
56///
57/// use qubit_progress::{
58/// NoOpProgressReporter,
59/// Progress,
60/// ProgressCounters,
61/// RunningProgressLoop,
62/// };
63///
64/// let reporter = NoOpProgressReporter;
65/// let completed = Arc::new(AtomicUsize::new(0));
66/// let (progress_loop, notifier) = RunningProgressLoop::channel();
67///
68/// thread::scope(|scope| {
69/// let loop_completed = Arc::clone(&completed);
70/// let reporter_ref = &reporter;
71/// let progress_thread = scope.spawn(move || {
72/// // This background reporter thread owns the loop. It does not own
73/// // the operation state; it only reads a fresh counter snapshot when
74/// // the interval is due or a worker sends a running point.
75/// let progress = Progress::new(reporter_ref, Duration::ZERO);
76/// progress_loop.run(progress, || {
77/// ProgressCounters::new(Some(3))
78/// .with_completed_count(loop_completed.load(Ordering::Acquire))
79/// });
80/// });
81///
82/// // Worker code updates domain state first, then wakes the loop. With a
83/// // zero interval, each running point may emit a `running` event.
84/// for _ in 0..3 {
85/// completed.fetch_add(1, Ordering::AcqRel);
86/// assert!(notifier.running_point());
87/// }
88///
89/// // Stop the loop before leaving the scope so reporter panics can be
90/// // propagated through the join handle.
91/// assert!(notifier.stop());
92/// progress_thread.join().expect("progress loop should stop");
93/// });
94/// ```
95///
96/// # Author
97///
98/// Haixing Hu
99pub struct RunningProgressLoop {
100 /// Signal receiver owned by the reporter loop.
101 signal_receiver: Receiver<RunningProgressSignal>,
102}
103
104/// Result of waiting for a running progress loop signal.
105enum RunningProgressWait {
106 /// A worker or stop signal was received.
107 Signal(RunningProgressSignal),
108 /// No signal arrived before the positive report interval elapsed.
109 Timeout,
110 /// All senders were dropped.
111 Disconnected,
112}
113
114impl RunningProgressLoop {
115 /// Creates a paired running progress loop and notifier.
116 ///
117 /// # Returns
118 ///
119 /// A loop that owns the signal receiver and a notifier that sends wakeup or
120 /// stop signals to that loop.
121 #[inline]
122 pub fn channel() -> (Self, RunningProgressNotifier) {
123 let (signal_sender, signal_receiver) = mpsc::channel();
124 (
125 Self { signal_receiver },
126 RunningProgressNotifier { signal_sender },
127 )
128 }
129
130 /// Runs until a stop signal is received or every notifier is dropped.
131 ///
132 /// # Parameters
133 ///
134 /// * `progress` - Progress run used to emit `running` events.
135 /// * `snapshot` - Closure that returns the current counters whenever a
136 /// `running` event may be due.
137 ///
138 /// # Panics
139 ///
140 /// Propagates panics from the configured reporter when a `running` event is
141 /// due.
142 pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
143 where
144 F: FnMut() -> ProgressCounters,
145 {
146 let report_interval = progress.report_interval();
147 while let RunningProgressWait::Signal(RunningProgressSignal::RunningPoint)
148 | RunningProgressWait::Timeout =
149 receive_running_progress_signal(&self.signal_receiver, report_interval)
150 {
151 progress.report_running_if_due(snapshot());
152 }
153 }
154}
155
156/// Receives one running progress loop signal.
157///
158/// # Parameters
159///
160/// * `signal_receiver` - Signal receiver shared with notifiers.
161/// * `report_interval` - Configured progress-report interval.
162///
163/// # Returns
164///
165/// A worker or stop signal, a timeout marker for positive intervals, or a
166/// disconnected marker when all notifiers have disconnected.
167fn receive_running_progress_signal(
168 signal_receiver: &Receiver<RunningProgressSignal>,
169 report_interval: Duration,
170) -> RunningProgressWait {
171 if report_interval.is_zero() {
172 return match signal_receiver.recv() {
173 Ok(signal) => RunningProgressWait::Signal(signal),
174 Err(_) => RunningProgressWait::Disconnected,
175 };
176 }
177 match signal_receiver.recv_timeout(report_interval) {
178 Ok(signal) => RunningProgressWait::Signal(signal),
179 Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
180 Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
181 }
182}