Skip to main content

qubit_progress/running/
running_progress_loop.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::mpsc::{
12        self,
13        Receiver,
14        RecvTimeoutError,
15    },
16    thread,
17    time::Duration,
18};
19
20use crate::{
21    Progress,
22    model::ProgressCounters,
23};
24
25use super::{
26    running_progress_notifier::RunningProgressNotifier,
27    running_progress_signal::RunningProgressSignal,
28    scoped_running_progress::ScopedRunningProgress,
29};
30
31/// Runs periodic `running` progress reports for work tracked elsewhere.
32///
33/// `RunningProgressLoop` is useful when worker threads update shared state and
34/// a separate reporter thread should periodically emit `running` events. The
35/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36/// and a snapshot closure that converts their domain state into
37/// [`ProgressCounters`].
38///
39/// Use [`Self::spawn_scoped`] when the reporter thread can be scoped to the
40/// operation call. It returns a [`ScopedRunningProgress`] guard and cloneable
41/// [`crate::RunningProgressPointHandle`] handles for workers. Use [`Self::channel`]
42/// only when callers need to own the lower-level loop and notifier directly.
43///
44/// # Examples
45///
46/// ```
47/// use std::{
48///     sync::{
49///         Arc,
50///         atomic::{
51///             AtomicUsize,
52///             Ordering,
53///         },
54///     },
55///     thread,
56///     time::Duration,
57/// };
58///
59/// use qubit_progress::{
60///     NoOpProgressReporter,
61///     Progress,
62///     ProgressCounters,
63///     RunningProgressLoop,
64/// };
65///
66/// let reporter = NoOpProgressReporter;
67/// let completed = Arc::new(AtomicUsize::new(0));
68///
69/// thread::scope(|scope| {
70///     let loop_completed = Arc::clone(&completed);
71///     let progress = Progress::new(&reporter, Duration::ZERO);
72///     let running_progress =
73///         RunningProgressLoop::spawn_scoped(scope, progress, move || {
74///             // The background reporter thread does not own the operation
75///             // state. It only reads a fresh counter snapshot when the
76///             // interval is due or a worker sends a running point.
77///             ProgressCounters::new(Some(3))
78///                 .with_completed_count(loop_completed.load(Ordering::Acquire))
79///         });
80///     let progress_point_handle = running_progress.point_handle();
81///
82///     // Worker code updates domain state first, then wakes the loop. With a
83///     // zero interval, each running point may emit a `running` event.
84///     for _ in 0..3 {
85///         completed.fetch_add(1, Ordering::AcqRel);
86///         assert!(progress_point_handle.report());
87///     }
88///
89///     // Stop the loop before leaving the scope. Reporter panics are
90///     // propagated by `stop_and_join`.
91///     running_progress.stop_and_join();
92/// });
93/// ```
94///
95/// # Author
96///
97/// Haixing Hu
98pub struct RunningProgressLoop {
99    /// Signal receiver owned by the reporter loop.
100    signal_receiver: Receiver<RunningProgressSignal>,
101}
102
103/// Result of waiting for a running progress loop signal.
104enum RunningProgressWait {
105    /// A worker or stop signal was received.
106    Signal(RunningProgressSignal),
107    /// No signal arrived before the positive report interval elapsed.
108    Timeout,
109    /// All senders were dropped.
110    Disconnected,
111}
112
113impl RunningProgressWait {
114    /// Returns `true` when the running progress loop should call
115    /// [`Progress::report_running_if_due`] after this wait result.
116    #[inline]
117    fn should_report(self) -> bool {
118        match self {
119            Self::Timeout => true,
120            Self::Disconnected => false,
121            Self::Signal(signal) => match signal {
122                RunningProgressSignal::RunningPoint => true,
123                RunningProgressSignal::Stop => false,
124            },
125        }
126    }
127}
128
129impl RunningProgressLoop {
130    /// Spawns a scoped reporter thread for running progress events.
131    ///
132    /// # Parameters
133    ///
134    /// * `scope` - Thread scope that owns the reporter thread.
135    /// * `progress` - Progress run used by the reporter thread.
136    /// * `snapshot` - Closure that returns current counters whenever a
137    ///   `running` event may be due.
138    ///
139    /// # Returns
140    ///
141    /// A guard that can create worker point handles and stop the scoped
142    /// reporter thread.
143    pub fn spawn_scoped<'scope, 'env, 'progress, F>(
144        scope: &'scope thread::Scope<'scope, 'env>,
145        progress: Progress<'progress>,
146        snapshot: F,
147    ) -> ScopedRunningProgress<'scope>
148    where
149        'progress: 'scope,
150        F: FnMut() -> ProgressCounters + Send + 'scope,
151    {
152        let report_points = progress.report_interval().is_zero();
153        let (progress_loop, notifier) = Self::channel();
154        let progress_thread = scope.spawn(move || {
155            progress_loop.run(progress, snapshot);
156        });
157        ScopedRunningProgress::new(notifier, progress_thread, report_points)
158    }
159
160    /// Creates a paired running progress loop and notifier.
161    ///
162    /// # Returns
163    ///
164    /// A loop that owns the signal receiver and a notifier that sends wakeup or
165    /// stop signals to that loop.
166    pub fn channel() -> (Self, RunningProgressNotifier) {
167        let (signal_sender, signal_receiver) = mpsc::channel();
168        (
169            Self { signal_receiver },
170            RunningProgressNotifier { signal_sender },
171        )
172    }
173
174    /// Runs until a stop signal is received or every notifier is dropped.
175    ///
176    /// # Parameters
177    ///
178    /// * `progress` - Progress run used to emit `running` events.
179    /// * `snapshot` - Closure that returns the current counters whenever a
180    ///   `running` event may be due.
181    ///
182    /// # Panics
183    ///
184    /// Propagates panics from the configured reporter when a `running` event is
185    /// due.
186    pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
187    where
188        F: FnMut() -> ProgressCounters,
189    {
190        let report_interval = progress.report_interval();
191        while self.receive_wait(report_interval).should_report() {
192            progress.report_running_if_due(snapshot());
193        }
194    }
195
196    /// Waits once on the signal channel and maps the outcome to [`RunningProgressWait`].
197    ///
198    /// The calling thread blocks until this wait completes.
199    ///
200    /// When `report_interval` is [`Duration::ZERO`], uses [`Receiver::recv`]: the call returns when a
201    /// [`RunningProgressSignal`] is received or when every notifier sender has been dropped. In this
202    /// mode no [`RunningProgressWait::Timeout`] is produced.
203    ///
204    /// When `report_interval` is positive, uses [`Receiver::recv_timeout`]: if no message arrives
205    /// before the deadline, returns [`RunningProgressWait::Timeout`] so [`Self::run`] can drive periodic
206    /// `running` progress; if a message arrives first, returns that signal wrapped in
207    /// [`RunningProgressWait::Signal`], or [`RunningProgressWait::Disconnected`] if the channel closes.
208    ///
209    /// # Parameters
210    ///
211    /// * `report_interval` - Configured report interval from the [`Progress`] run passed to [`Self::run`];
212    ///   [`Duration::ZERO`] selects unbounded waits (event-driven only), otherwise each wait is capped
213    ///   by this duration and may time out.
214    ///
215    /// # Returns
216    ///
217    /// * [`RunningProgressWait::Signal`] - The next [`RunningProgressSignal`] from a notifier.
218    /// * [`RunningProgressWait::Timeout`] - Only when `report_interval` is positive and the wait reached
219    ///   the deadline without a message.
220    /// * [`RunningProgressWait::Disconnected`] - The MPSC channel has no senders left.
221    fn receive_wait(&self, report_interval: Duration) -> RunningProgressWait {
222        if report_interval.is_zero() {
223            return match self.signal_receiver.recv() {
224                Ok(signal) => RunningProgressWait::Signal(signal),
225                Err(_) => RunningProgressWait::Disconnected,
226            };
227        }
228        match self.signal_receiver.recv_timeout(report_interval) {
229            Ok(signal) => RunningProgressWait::Signal(signal),
230            Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
231            Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
232        }
233    }
234}