Skip to main content

qubit_progress/running/
running_progress_loop.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10use std::{
11    sync::mpsc::{
12        self,
13        Receiver,
14        RecvTimeoutError,
15    },
16    time::Duration,
17};
18
19use crate::{
20    Progress,
21    model::ProgressCounters,
22};
23
24use super::{
25    running_progress_notifier::RunningProgressNotifier,
26    running_progress_signal::RunningProgressSignal,
27};
28
29/// Runs periodic `running` progress reports for work tracked elsewhere.
30///
31/// `RunningProgressLoop` is useful when worker threads update shared state and
32/// a separate reporter thread should periodically emit `running` events. The
33/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
34/// and a snapshot closure that converts their domain state into
35/// [`ProgressCounters`].
36///
37/// Use [`Self::channel`] to create a loop and its matching
38/// [`RunningProgressNotifier`]. Move the loop into a reporter thread, clone the
39/// notifier into workers when zero-interval wakeups are needed, and send
40/// [`RunningProgressNotifier::stop`] when the operation is complete.
41///
42/// # Examples
43///
44/// ```
45/// use std::{
46///     sync::{
47///         Arc,
48///         atomic::{
49///             AtomicUsize,
50///             Ordering,
51///         },
52///     },
53///     thread,
54///     time::Duration,
55/// };
56///
57/// use qubit_progress::{
58///     NoOpProgressReporter,
59///     Progress,
60///     ProgressCounters,
61///     RunningProgressLoop,
62/// };
63///
64/// let reporter = NoOpProgressReporter;
65/// let completed = Arc::new(AtomicUsize::new(0));
66/// let (progress_loop, notifier) = RunningProgressLoop::channel();
67///
68/// thread::scope(|scope| {
69///     let loop_completed = Arc::clone(&completed);
70///     let reporter_ref = &reporter;
71///     let progress_thread = scope.spawn(move || {
72///         // This background reporter thread owns the loop. It does not own
73///         // the operation state; it only reads a fresh counter snapshot when
74///         // the interval is due or a worker sends a running point.
75///         let progress = Progress::new(reporter_ref, Duration::ZERO);
76///         progress_loop.run(progress, || {
77///             ProgressCounters::new(Some(3))
78///                 .with_completed_count(loop_completed.load(Ordering::Acquire))
79///         });
80///     });
81///
82///     // Worker code updates domain state first, then wakes the loop. With a
83///     // zero interval, each running point may emit a `running` event.
84///     for _ in 0..3 {
85///         completed.fetch_add(1, Ordering::AcqRel);
86///         assert!(notifier.running_point());
87///     }
88///
89///     // Stop the loop before leaving the scope so reporter panics can be
90///     // propagated through the join handle.
91///     assert!(notifier.stop());
92///     progress_thread.join().expect("progress loop should stop");
93/// });
94/// ```
95///
96/// # Author
97///
98/// Haixing Hu
99pub struct RunningProgressLoop {
100    /// Signal receiver owned by the reporter loop.
101    signal_receiver: Receiver<RunningProgressSignal>,
102}
103
104/// Result of waiting for a running progress loop signal.
105enum RunningProgressWait {
106    /// A worker or stop signal was received.
107    Signal(RunningProgressSignal),
108    /// No signal arrived before the positive report interval elapsed.
109    Timeout,
110    /// All senders were dropped.
111    Disconnected,
112}
113
114impl RunningProgressLoop {
115    /// Creates a paired running progress loop and notifier.
116    ///
117    /// # Returns
118    ///
119    /// A loop that owns the signal receiver and a notifier that sends wakeup or
120    /// stop signals to that loop.
121    #[inline]
122    pub fn channel() -> (Self, RunningProgressNotifier) {
123        let (signal_sender, signal_receiver) = mpsc::channel();
124        (
125            Self { signal_receiver },
126            RunningProgressNotifier { signal_sender },
127        )
128    }
129
130    /// Runs until a stop signal is received or every notifier is dropped.
131    ///
132    /// # Parameters
133    ///
134    /// * `progress` - Progress run used to emit `running` events.
135    /// * `snapshot` - Closure that returns the current counters whenever a
136    ///   `running` event may be due.
137    ///
138    /// # Panics
139    ///
140    /// Propagates panics from the configured reporter when a `running` event is
141    /// due.
142    pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
143    where
144        F: FnMut() -> ProgressCounters,
145    {
146        let report_interval = progress.report_interval();
147        while let RunningProgressWait::Signal(RunningProgressSignal::RunningPoint)
148        | RunningProgressWait::Timeout =
149            receive_running_progress_signal(&self.signal_receiver, report_interval)
150        {
151            progress.report_running_if_due(snapshot());
152        }
153    }
154}
155
156/// Receives one running progress loop signal.
157///
158/// # Parameters
159///
160/// * `signal_receiver` - Signal receiver shared with notifiers.
161/// * `report_interval` - Configured progress-report interval.
162///
163/// # Returns
164///
165/// A worker or stop signal, a timeout marker for positive intervals, or a
166/// disconnected marker when all notifiers have disconnected.
167fn receive_running_progress_signal(
168    signal_receiver: &Receiver<RunningProgressSignal>,
169    report_interval: Duration,
170) -> RunningProgressWait {
171    if report_interval.is_zero() {
172        return match signal_receiver.recv() {
173            Ok(signal) => RunningProgressWait::Signal(signal),
174            Err(_) => RunningProgressWait::Disconnected,
175        };
176    }
177    match signal_receiver.recv_timeout(report_interval) {
178        Ok(signal) => RunningProgressWait::Signal(signal),
179        Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
180        Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
181    }
182}