qubit_progress/running/running_progress_loop.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11 sync::mpsc::{
12 self,
13 Receiver,
14 RecvTimeoutError,
15 },
16 thread,
17 time::Duration,
18};
19
20use crate::{
21 Progress,
22 model::ProgressCounters,
23};
24
25use super::{
26 running_progress_notifier::RunningProgressNotifier,
27 running_progress_signal::RunningProgressSignal,
28 scoped_running_progress::ScopedRunningProgress,
29};
30
31/// Runs periodic `running` progress reports for work tracked elsewhere.
32///
33/// `RunningProgressLoop` is useful when worker threads update shared state and
34/// a separate reporter thread should periodically emit `running` events. The
35/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36/// and a snapshot closure that converts their domain state into
37/// [`ProgressCounters`].
38///
39/// Use [`Self::spawn_scoped`] when the reporter thread can be scoped to the
40/// operation call. It returns a [`ScopedRunningProgress`] guard and cloneable
41/// [`crate::RunningProgressPoints`] handles for workers. Use [`Self::channel`]
42/// only when callers need to own the lower-level loop and notifier directly.
43///
44/// # Examples
45///
46/// ```
47/// use std::{
48/// sync::{
49/// Arc,
50/// atomic::{
51/// AtomicUsize,
52/// Ordering,
53/// },
54/// },
55/// thread,
56/// time::Duration,
57/// };
58///
59/// use qubit_progress::{
60/// NoOpProgressReporter,
61/// Progress,
62/// ProgressCounters,
63/// RunningProgressLoop,
64/// };
65///
66/// let reporter = NoOpProgressReporter;
67/// let completed = Arc::new(AtomicUsize::new(0));
68///
69/// thread::scope(|scope| {
70/// let loop_completed = Arc::clone(&completed);
71/// let progress = Progress::new(&reporter, Duration::ZERO);
72/// let running_progress =
73/// RunningProgressLoop::spawn_scoped(scope, progress, move || {
74/// // The background reporter thread does not own the operation
75/// // state. It only reads a fresh counter snapshot when the
76/// // interval is due or a worker sends a running point.
77/// ProgressCounters::new(Some(3))
78/// .with_completed_count(loop_completed.load(Ordering::Acquire))
79/// });
80/// let progress_points = running_progress.points();
81///
82/// // Worker code updates domain state first, then wakes the loop. With a
83/// // zero interval, each running point may emit a `running` event.
84/// for _ in 0..3 {
85/// completed.fetch_add(1, Ordering::AcqRel);
86/// assert!(progress_points.running_point());
87/// }
88///
89/// // Stop the loop before leaving the scope. Reporter panics are
90/// // propagated by `stop_and_join`.
91/// running_progress.stop_and_join();
92/// });
93/// ```
94///
95/// # Author
96///
97/// Haixing Hu
98pub struct RunningProgressLoop {
99 /// Signal receiver owned by the reporter loop.
100 signal_receiver: Receiver<RunningProgressSignal>,
101}
102
103/// Result of waiting for a running progress loop signal.
104enum RunningProgressWait {
105 /// A worker or stop signal was received.
106 Signal(RunningProgressSignal),
107 /// No signal arrived before the positive report interval elapsed.
108 Timeout,
109 /// All senders were dropped.
110 Disconnected,
111}
112
113impl RunningProgressLoop {
114 /// Creates a paired running progress loop and notifier.
115 ///
116 /// # Returns
117 ///
118 /// A loop that owns the signal receiver and a notifier that sends wakeup or
119 /// stop signals to that loop.
120 #[inline]
121 pub fn channel() -> (Self, RunningProgressNotifier) {
122 let (signal_sender, signal_receiver) = mpsc::channel();
123 (
124 Self { signal_receiver },
125 RunningProgressNotifier { signal_sender },
126 )
127 }
128
129 /// Spawns a scoped reporter thread for running progress events.
130 ///
131 /// # Parameters
132 ///
133 /// * `scope` - Thread scope that owns the reporter thread.
134 /// * `progress` - Progress run used by the reporter thread.
135 /// * `snapshot` - Closure that returns current counters whenever a
136 /// `running` event may be due.
137 ///
138 /// # Returns
139 ///
140 /// A guard that can create worker point handles and stop the scoped
141 /// reporter thread.
142 #[inline]
143 pub fn spawn_scoped<'scope, 'env, 'progress, F>(
144 scope: &'scope thread::Scope<'scope, 'env>,
145 progress: Progress<'progress>,
146 snapshot: F,
147 ) -> ScopedRunningProgress<'scope>
148 where
149 'progress: 'scope,
150 F: FnMut() -> ProgressCounters + Send + 'scope,
151 {
152 let report_points = progress.report_interval().is_zero();
153 let (progress_loop, notifier) = Self::channel();
154 let progress_thread = scope.spawn(move || {
155 progress_loop.run(progress, snapshot);
156 });
157 ScopedRunningProgress::new(notifier, progress_thread, report_points)
158 }
159
160 /// Runs until a stop signal is received or every notifier is dropped.
161 ///
162 /// # Parameters
163 ///
164 /// * `progress` - Progress run used to emit `running` events.
165 /// * `snapshot` - Closure that returns the current counters whenever a
166 /// `running` event may be due.
167 ///
168 /// # Panics
169 ///
170 /// Propagates panics from the configured reporter when a `running` event is
171 /// due.
172 pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
173 where
174 F: FnMut() -> ProgressCounters,
175 {
176 let report_interval = progress.report_interval();
177 while let RunningProgressWait::Signal(RunningProgressSignal::RunningPoint)
178 | RunningProgressWait::Timeout =
179 receive_running_progress_signal(&self.signal_receiver, report_interval)
180 {
181 progress.report_running_if_due(snapshot());
182 }
183 }
184}
185
186/// Receives one running progress loop signal.
187///
188/// # Parameters
189///
190/// * `signal_receiver` - Signal receiver shared with notifiers.
191/// * `report_interval` - Configured progress-report interval.
192///
193/// # Returns
194///
195/// A worker or stop signal, a timeout marker for positive intervals, or a
196/// disconnected marker when all notifiers have disconnected.
197fn receive_running_progress_signal(
198 signal_receiver: &Receiver<RunningProgressSignal>,
199 report_interval: Duration,
200) -> RunningProgressWait {
201 if report_interval.is_zero() {
202 return match signal_receiver.recv() {
203 Ok(signal) => RunningProgressWait::Signal(signal),
204 Err(_) => RunningProgressWait::Disconnected,
205 };
206 }
207 match signal_receiver.recv_timeout(report_interval) {
208 Ok(signal) => RunningProgressWait::Signal(signal),
209 Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
210 Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
211 }
212}