qubit_progress/running/running_progress_loop.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::mpsc::{
12 self,
13 Receiver,
14 RecvTimeoutError,
15 },
16 thread,
17 time::Duration,
18};
19
20use crate::{
21 Progress,
22 model::ProgressCounters,
23};
24
25use super::{
26 running_progress_notifier::RunningProgressNotifier,
27 running_progress_signal::RunningProgressSignal,
28 scoped_running_progress::ScopedRunningProgress,
29};
30
31/// Runs periodic `running` progress reports for work tracked elsewhere.
32///
33/// `RunningProgressLoop` is useful when worker threads update shared state and
34/// a separate reporter thread should periodically emit `running` events. The
35/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36/// and a snapshot closure that converts their domain state into
37/// [`ProgressCounters`].
38///
39/// Use [`Self::spawn_scoped`] when the reporter thread can be scoped to the
40/// operation call. It returns a [`ScopedRunningProgress`] guard and cloneable
41/// [`crate::RunningProgressPointHandle`] handles for workers. Use [`Self::channel`]
42/// only when callers need to own the lower-level loop and notifier directly.
43///
44/// # Examples
45///
46/// ```
47/// use std::{
48/// sync::{
49/// Arc,
50/// atomic::{
51/// AtomicUsize,
52/// Ordering,
53/// },
54/// },
55/// thread,
56/// time::Duration,
57/// };
58///
59/// use qubit_progress::{
60/// NoOpProgressReporter,
61/// Progress,
62/// ProgressCounters,
63/// RunningProgressLoop,
64/// };
65///
66/// let reporter = NoOpProgressReporter;
67/// let completed = Arc::new(AtomicUsize::new(0));
68///
69/// thread::scope(|scope| {
70/// let loop_completed = Arc::clone(&completed);
71/// let progress = Progress::new(&reporter, Duration::ZERO);
72/// let running_progress =
73/// RunningProgressLoop::spawn_scoped(scope, progress, move || {
74/// // The background reporter thread does not own the operation
75/// // state. It only reads a fresh counter snapshot when the
76/// // interval is due or a worker sends a running point.
77/// ProgressCounters::new(Some(3))
78/// .with_completed_count(loop_completed.load(Ordering::Acquire))
79/// });
80/// let progress_point_handle = running_progress.point_handle();
81///
82/// // Worker code updates domain state first, then wakes the loop. With a
83/// // zero interval, each running point may emit a `running` event.
84/// for _ in 0..3 {
85/// completed.fetch_add(1, Ordering::AcqRel);
86/// assert!(progress_point_handle.report());
87/// }
88///
89/// // Stop the loop before leaving the scope. Reporter panics are
90/// // propagated by `stop_and_join`.
91/// running_progress.stop_and_join();
92/// });
93/// ```
94///
95/// # Author
96///
97/// Haixing Hu
98pub struct RunningProgressLoop {
99 /// Signal receiver owned by the reporter loop.
100 signal_receiver: Receiver<RunningProgressSignal>,
101}
102
103/// Result of waiting for a running progress loop signal.
104enum RunningProgressWait {
105 /// A worker or stop signal was received.
106 Signal(RunningProgressSignal),
107 /// No signal arrived before the positive report interval elapsed.
108 Timeout,
109 /// All senders were dropped.
110 Disconnected,
111}
112
113impl RunningProgressWait {
114 /// Returns `true` when the running progress loop should call
115 /// [`Progress::report_running_if_due`] after this wait result.
116 #[inline]
117 fn should_report(self) -> bool {
118 match self {
119 Self::Timeout => true,
120 Self::Disconnected => false,
121 Self::Signal(signal) => match signal {
122 RunningProgressSignal::RunningPoint => true,
123 RunningProgressSignal::Stop => false,
124 },
125 }
126 }
127}
128
129impl RunningProgressLoop {
130 /// Spawns a scoped reporter thread for running progress events.
131 ///
132 /// # Parameters
133 ///
134 /// * `scope` - Thread scope that owns the reporter thread.
135 /// * `progress` - Progress run used by the reporter thread.
136 /// * `snapshot` - Closure that returns current counters whenever a
137 /// `running` event may be due.
138 ///
139 /// # Returns
140 ///
141 /// A guard that can create worker point handles and stop the scoped
142 /// reporter thread.
143 pub fn spawn_scoped<'scope, 'env, 'progress, F>(
144 scope: &'scope thread::Scope<'scope, 'env>,
145 progress: Progress<'progress>,
146 snapshot: F,
147 ) -> ScopedRunningProgress<'scope>
148 where
149 'progress: 'scope,
150 F: FnMut() -> ProgressCounters + Send + 'scope,
151 {
152 let report_points = progress.report_interval().is_zero();
153 let (progress_loop, notifier) = Self::channel();
154 let progress_thread = scope.spawn(move || {
155 progress_loop.run(progress, snapshot);
156 });
157 ScopedRunningProgress::new(notifier, progress_thread, report_points)
158 }
159
160 /// Creates a paired running progress loop and notifier.
161 ///
162 /// # Returns
163 ///
164 /// A loop that owns the signal receiver and a notifier that sends wakeup or
165 /// stop signals to that loop.
166 pub fn channel() -> (Self, RunningProgressNotifier) {
167 let (signal_sender, signal_receiver) = mpsc::channel();
168 (
169 Self { signal_receiver },
170 RunningProgressNotifier { signal_sender },
171 )
172 }
173
174 /// Runs until a stop signal is received or every notifier is dropped.
175 ///
176 /// # Parameters
177 ///
178 /// * `progress` - Progress run used to emit `running` events.
179 /// * `snapshot` - Closure that returns the current counters whenever a
180 /// `running` event may be due.
181 ///
182 /// # Panics
183 ///
184 /// Propagates panics from the configured reporter when a `running` event is
185 /// due.
186 pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
187 where
188 F: FnMut() -> ProgressCounters,
189 {
190 let report_interval = progress.report_interval();
191 while self.receive_wait(report_interval).should_report() {
192 progress.report_running_if_due(snapshot());
193 }
194 }
195
196 /// Waits once on the signal channel and maps the outcome to [`RunningProgressWait`].
197 ///
198 /// The calling thread blocks until this wait completes.
199 ///
200 /// When `report_interval` is [`Duration::ZERO`], uses [`Receiver::recv`]: the call returns when a
201 /// [`RunningProgressSignal`] is received or when every notifier sender has been dropped. In this
202 /// mode no [`RunningProgressWait::Timeout`] is produced.
203 ///
204 /// When `report_interval` is positive, uses [`Receiver::recv_timeout`]: if no message arrives
205 /// before the deadline, returns [`RunningProgressWait::Timeout`] so [`Self::run`] can drive periodic
206 /// `running` progress; if a message arrives first, returns that signal wrapped in
207 /// [`RunningProgressWait::Signal`], or [`RunningProgressWait::Disconnected`] if the channel closes.
208 ///
209 /// # Parameters
210 ///
211 /// * `report_interval` - Configured report interval from the [`Progress`] run passed to [`Self::run`];
212 /// [`Duration::ZERO`] selects unbounded waits (event-driven only), otherwise each wait is capped
213 /// by this duration and may time out.
214 ///
215 /// # Returns
216 ///
217 /// * [`RunningProgressWait::Signal`] - The next [`RunningProgressSignal`] from a notifier.
218 /// * [`RunningProgressWait::Timeout`] - Only when `report_interval` is positive and the wait reached
219 /// the deadline without a message.
220 /// * [`RunningProgressWait::Disconnected`] - The MPSC channel has no senders left.
221 fn receive_wait(&self, report_interval: Duration) -> RunningProgressWait {
222 if report_interval.is_zero() {
223 return match self.signal_receiver.recv() {
224 Ok(signal) => RunningProgressWait::Signal(signal),
225 Err(_) => RunningProgressWait::Disconnected,
226 };
227 }
228 match self.signal_receiver.recv_timeout(report_interval) {
229 Ok(signal) => RunningProgressWait::Signal(signal),
230 Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
231 Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
232 }
233 }
234}