Skip to main content

qubit_progress/running/
running_progress_loop.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::mpsc::{
12        self,
13        Receiver,
14        RecvTimeoutError,
15    },
16    thread,
17    time::Duration,
18};
19
20use crate::{
21    Progress,
22    model::ProgressCounters,
23};
24
25use super::{
26    running_progress_notifier::RunningProgressNotifier,
27    running_progress_signal::RunningProgressSignal,
28    scoped_running_progress::ScopedRunningProgress,
29};
30
31/// Runs periodic `running` progress reports for work tracked elsewhere.
32///
33/// `RunningProgressLoop` is useful when worker threads update shared state and
34/// a separate reporter thread should periodically emit `running` events. The
35/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36/// and a snapshot closure that converts their domain state into
37/// [`ProgressCounters`].
38///
39/// Use [`Self::spawn_scoped`] when the reporter thread can be scoped to the
40/// operation call. It returns a [`ScopedRunningProgress`] guard and cloneable
41/// [`crate::RunningProgressPoints`] handles for workers. Use [`Self::channel`]
42/// only when callers need to own the lower-level loop and notifier directly.
43///
44/// # Examples
45///
46/// ```
47/// use std::{
48///     sync::{
49///         Arc,
50///         atomic::{
51///             AtomicUsize,
52///             Ordering,
53///         },
54///     },
55///     thread,
56///     time::Duration,
57/// };
58///
59/// use qubit_progress::{
60///     NoOpProgressReporter,
61///     Progress,
62///     ProgressCounters,
63///     RunningProgressLoop,
64/// };
65///
66/// let reporter = NoOpProgressReporter;
67/// let completed = Arc::new(AtomicUsize::new(0));
68///
69/// thread::scope(|scope| {
70///     let loop_completed = Arc::clone(&completed);
71///     let progress = Progress::new(&reporter, Duration::ZERO);
72///     let running_progress =
73///         RunningProgressLoop::spawn_scoped(scope, progress, move || {
74///             // The background reporter thread does not own the operation
75///             // state. It only reads a fresh counter snapshot when the
76///             // interval is due or a worker sends a running point.
77///             ProgressCounters::new(Some(3))
78///                 .with_completed_count(loop_completed.load(Ordering::Acquire))
79///         });
80///     let progress_points = running_progress.points();
81///
82///     // Worker code updates domain state first, then wakes the loop. With a
83///     // zero interval, each running point may emit a `running` event.
84///     for _ in 0..3 {
85///         completed.fetch_add(1, Ordering::AcqRel);
86///         assert!(progress_points.running_point());
87///     }
88///
89///     // Stop the loop before leaving the scope. Reporter panics are
90///     // propagated by `stop_and_join`.
91///     running_progress.stop_and_join();
92/// });
93/// ```
94///
95/// # Author
96///
97/// Haixing Hu
98pub struct RunningProgressLoop {
99    /// Signal receiver owned by the reporter loop.
100    signal_receiver: Receiver<RunningProgressSignal>,
101}
102
103/// Result of waiting for a running progress loop signal.
104enum RunningProgressWait {
105    /// A worker or stop signal was received.
106    Signal(RunningProgressSignal),
107    /// No signal arrived before the positive report interval elapsed.
108    Timeout,
109    /// All senders were dropped.
110    Disconnected,
111}
112
113impl RunningProgressLoop {
114    /// Creates a paired running progress loop and notifier.
115    ///
116    /// # Returns
117    ///
118    /// A loop that owns the signal receiver and a notifier that sends wakeup or
119    /// stop signals to that loop.
120    #[inline]
121    pub fn channel() -> (Self, RunningProgressNotifier) {
122        let (signal_sender, signal_receiver) = mpsc::channel();
123        (
124            Self { signal_receiver },
125            RunningProgressNotifier { signal_sender },
126        )
127    }
128
129    /// Spawns a scoped reporter thread for running progress events.
130    ///
131    /// # Parameters
132    ///
133    /// * `scope` - Thread scope that owns the reporter thread.
134    /// * `progress` - Progress run used by the reporter thread.
135    /// * `snapshot` - Closure that returns current counters whenever a
136    ///   `running` event may be due.
137    ///
138    /// # Returns
139    ///
140    /// A guard that can create worker point handles and stop the scoped
141    /// reporter thread.
142    #[inline]
143    pub fn spawn_scoped<'scope, 'env, 'progress, F>(
144        scope: &'scope thread::Scope<'scope, 'env>,
145        progress: Progress<'progress>,
146        snapshot: F,
147    ) -> ScopedRunningProgress<'scope>
148    where
149        'progress: 'scope,
150        F: FnMut() -> ProgressCounters + Send + 'scope,
151    {
152        let report_points = progress.report_interval().is_zero();
153        let (progress_loop, notifier) = Self::channel();
154        let progress_thread = scope.spawn(move || {
155            progress_loop.run(progress, snapshot);
156        });
157        ScopedRunningProgress::new(notifier, progress_thread, report_points)
158    }
159
160    /// Runs until a stop signal is received or every notifier is dropped.
161    ///
162    /// # Parameters
163    ///
164    /// * `progress` - Progress run used to emit `running` events.
165    /// * `snapshot` - Closure that returns the current counters whenever a
166    ///   `running` event may be due.
167    ///
168    /// # Panics
169    ///
170    /// Propagates panics from the configured reporter when a `running` event is
171    /// due.
172    pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
173    where
174        F: FnMut() -> ProgressCounters,
175    {
176        let report_interval = progress.report_interval();
177        while let RunningProgressWait::Signal(RunningProgressSignal::RunningPoint)
178        | RunningProgressWait::Timeout =
179            receive_running_progress_signal(&self.signal_receiver, report_interval)
180        {
181            progress.report_running_if_due(snapshot());
182        }
183    }
184}
185
186/// Receives one running progress loop signal.
187///
188/// # Parameters
189///
190/// * `signal_receiver` - Signal receiver shared with notifiers.
191/// * `report_interval` - Configured progress-report interval.
192///
193/// # Returns
194///
195/// A worker or stop signal, a timeout marker for positive intervals, or a
196/// disconnected marker when all notifiers have disconnected.
197fn receive_running_progress_signal(
198    signal_receiver: &Receiver<RunningProgressSignal>,
199    report_interval: Duration,
200) -> RunningProgressWait {
201    if report_interval.is_zero() {
202        return match signal_receiver.recv() {
203            Ok(signal) => RunningProgressWait::Signal(signal),
204            Err(_) => RunningProgressWait::Disconnected,
205        };
206    }
207    match signal_receiver.recv_timeout(report_interval) {
208        Ok(signal) => RunningProgressWait::Signal(signal),
209        Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
210        Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
211    }
212}