Skip to main content

qubit_lock/monitor/
arc_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9//! # Arc Monitor
10//!
11//! Provides an Arc-wrapped synchronous monitor for condition-based state
12//! coordination across threads.
13//!
14//! # Author
15//!
16//! Haixing Hu
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use super::{
22    Monitor,
23    MonitorGuard,
24    WaitTimeoutResult,
25    WaitTimeoutStatus,
26};
27
28/// Arc-wrapped monitor for shared condition-based state coordination.
29///
30/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
31/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
32/// It preserves the same guard-based waiting, predicate-based waiting, and
33/// poison recovery semantics as [`Monitor`].
34///
35/// # Type Parameters
36///
37/// * `T` - The state protected by this monitor.
38///
39/// # Example
40///
41/// ```rust
42/// use std::thread;
43///
44/// use qubit_lock::lock::ArcMonitor;
45///
46/// let monitor = ArcMonitor::new(false);
47/// let waiter_monitor = monitor.clone();
48///
49/// let waiter = thread::spawn(move || {
50///     waiter_monitor.wait_until(
51///         |ready| *ready,
52///         |ready| {
53///             *ready = false;
54///         },
55///     );
56/// });
57///
58/// monitor.write(|ready| {
59///     *ready = true;
60/// });
61/// monitor.notify_all();
62///
63/// waiter.join().expect("waiter should finish");
64/// assert!(!monitor.read(|ready| *ready));
65/// ```
66///
67/// # Author
68///
69/// Haixing Hu
70pub struct ArcMonitor<T> {
71    /// Shared monitor instance.
72    inner: Arc<Monitor<T>>,
73}
74
75impl<T> ArcMonitor<T> {
76    /// Creates an Arc-wrapped monitor protecting the supplied state value.
77    ///
78    /// # Arguments
79    ///
80    /// * `state` - Initial state protected by the monitor.
81    ///
82    /// # Returns
83    ///
84    /// A cloneable monitor handle initialized with the supplied state.
85    #[inline]
86    pub fn new(state: T) -> Self {
87        Self {
88            inner: Arc::new(Monitor::new(state)),
89        }
90    }
91
92    /// Acquires the shared monitor and returns a guard.
93    ///
94    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
95    /// keeps the monitor mutex locked until it is dropped. It can also wait on
96    /// the monitor's condition variable through [`MonitorGuard::wait`] or
97    /// [`MonitorGuard::wait_timeout`].
98    ///
99    /// If the underlying mutex is poisoned, this method recovers the inner
100    /// state and still returns a guard.
101    ///
102    /// # Returns
103    ///
104    /// A guard that provides read and write access to the protected state.
105    ///
106    /// # Example
107    ///
108    /// ```rust
109    /// use qubit_lock::lock::ArcMonitor;
110    ///
111    /// let monitor = ArcMonitor::new(1);
112    /// {
113    ///     let mut value = monitor.lock();
114    ///     *value += 1;
115    /// }
116    ///
117    /// assert_eq!(monitor.read(|value| *value), 2);
118    /// ```
119    #[inline]
120    pub fn lock(&self) -> MonitorGuard<'_, T> {
121        self.inner.lock()
122    }
123
124    /// Acquires the monitor and reads the protected state.
125    ///
126    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
127    /// mutex is held, so keep it short and avoid long blocking work.
128    ///
129    /// # Arguments
130    ///
131    /// * `f` - Closure that receives an immutable reference to the state.
132    ///
133    /// # Returns
134    ///
135    /// The value returned by `f`.
136    #[inline]
137    pub fn read<R, F>(&self, f: F) -> R
138    where
139        F: FnOnce(&T) -> R,
140    {
141        self.inner.read(f)
142    }
143
144    /// Acquires the monitor and mutates the protected state.
145    ///
146    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
147    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
148    /// waiting thread may observe.
149    ///
150    /// # Arguments
151    ///
152    /// * `f` - Closure that receives a mutable reference to the state.
153    ///
154    /// # Returns
155    ///
156    /// The value returned by `f`.
157    #[inline]
158    pub fn write<R, F>(&self, f: F) -> R
159    where
160        F: FnOnce(&mut T) -> R,
161    {
162        self.inner.write(f)
163    }
164
165    /// Waits for a notification or timeout without checking state.
166    ///
167    /// This delegates to [`Monitor::wait_notify`]. Most
168    /// coordination code should prefer [`Self::wait_while`],
169    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
170    ///
171    /// Condition variables may wake spuriously, so
172    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
173    /// state.
174    ///
175    /// # Arguments
176    ///
177    /// * `timeout` - Maximum duration to wait for a notification.
178    ///
179    /// # Returns
180    ///
181    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
182    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
183    ///
184    /// # Example
185    ///
186    /// ```rust
187    /// use std::time::Duration;
188    ///
189    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
190    ///
191    /// let monitor = ArcMonitor::new(false);
192    /// let status = monitor.wait_notify(Duration::from_millis(1));
193    ///
194    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
195    /// ```
196    #[inline]
197    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
198        self.inner.wait_notify(timeout)
199    }
200
201    /// Waits while a predicate remains true, then mutates the protected state.
202    ///
203    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
204    /// while holding the monitor mutex, and the closure runs while the mutex is
205    /// still held after the predicate stops blocking.
206    ///
207    /// This method may block indefinitely if no thread changes the state so
208    /// that `waiting` becomes false and sends a notification.
209    ///
210    /// # Arguments
211    ///
212    /// * `waiting` - Predicate that returns `true` while the caller should
213    ///   keep waiting.
214    /// * `f` - Closure that receives mutable access after waiting is no longer
215    ///   required.
216    ///
217    /// # Returns
218    ///
219    /// The value returned by `f`.
220    ///
221    /// # Example
222    ///
223    /// ```rust
224    /// use std::thread;
225    ///
226    /// use qubit_lock::lock::ArcMonitor;
227    ///
228    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
229    /// let worker_monitor = monitor.clone();
230    ///
231    /// let worker = thread::spawn(move || {
232    ///     worker_monitor.wait_while(
233    ///         |items| items.is_empty(),
234    ///         |items| items.pop().expect("item should be ready"),
235    ///     )
236    /// });
237    ///
238    /// monitor.write(|items| items.push(7));
239    /// monitor.notify_one();
240    ///
241    /// assert_eq!(worker.join().expect("worker should finish"), 7);
242    /// ```
243    #[inline]
244    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
245    where
246        P: FnMut(&T) -> bool,
247        F: FnOnce(&mut T) -> R,
248    {
249        self.inner.wait_while(waiting, f)
250    }
251
252    /// Waits until the protected state satisfies a predicate, then mutates it.
253    ///
254    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
255    /// no thread changes the state to satisfy the predicate and sends a
256    /// notification.
257    ///
258    /// # Arguments
259    ///
260    /// * `ready` - Predicate that returns `true` when the state is ready.
261    /// * `f` - Closure that receives mutable access to the ready state.
262    ///
263    /// # Returns
264    ///
265    /// The value returned by `f`.
266    #[inline]
267    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
268    where
269        P: FnMut(&T) -> bool,
270        F: FnOnce(&mut T) -> R,
271    {
272        self.inner.wait_until(ready, f)
273    }
274
275    /// Waits while a predicate remains true, with an overall time limit.
276    ///
277    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
278    /// false before `timeout` expires, `f` runs while the monitor lock is still
279    /// held. If the timeout expires first, the closure is not called.
280    ///
281    /// # Arguments
282    ///
283    /// * `timeout` - Maximum total duration to wait.
284    /// * `waiting` - Predicate that returns `true` while the caller should
285    ///   continue waiting.
286    /// * `f` - Closure that receives mutable access when waiting is no longer
287    ///   required.
288    ///
289    /// # Returns
290    ///
291    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
292    /// predicate stops blocking before the timeout. Returns
293    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
294    ///
295    /// # Example
296    ///
297    /// ```rust
298    /// use std::time::Duration;
299    ///
300    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
301    ///
302    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
303    /// let result = monitor.wait_timeout_while(
304    ///     Duration::from_millis(1),
305    ///     |items| items.is_empty(),
306    ///     |items| items.pop(),
307    /// );
308    ///
309    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
310    /// ```
311    #[inline]
312    pub fn wait_timeout_while<R, P, F>(
313        &self,
314        timeout: Duration,
315        waiting: P,
316        f: F,
317    ) -> WaitTimeoutResult<R>
318    where
319        P: FnMut(&T) -> bool,
320        F: FnOnce(&mut T) -> R,
321    {
322        self.inner.wait_timeout_while(timeout, waiting, f)
323    }
324
325    /// Waits until a predicate becomes true, with an overall time limit.
326    ///
327    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
328    /// true before `timeout` expires, `f` runs while the monitor lock is still
329    /// held. If the timeout expires first, the closure is not called.
330    ///
331    /// # Arguments
332    ///
333    /// * `timeout` - Maximum total duration to wait.
334    /// * `ready` - Predicate that returns `true` when the caller may continue.
335    /// * `f` - Closure that receives mutable access to the ready state.
336    ///
337    /// # Returns
338    ///
339    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
340    /// predicate becomes true before the timeout. Returns
341    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
342    ///
343    /// # Example
344    ///
345    /// ```rust
346    /// use std::{
347    ///     thread,
348    ///     time::Duration,
349    /// };
350    ///
351    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
352    ///
353    /// let monitor = ArcMonitor::new(false);
354    /// let worker_monitor = monitor.clone();
355    ///
356    /// let worker = thread::spawn(move || {
357    ///     worker_monitor.wait_timeout_until(
358    ///         Duration::from_secs(1),
359    ///         |ready| *ready,
360    ///         |ready| {
361    ///             *ready = false;
362    ///             5
363    ///         },
364    ///     )
365    /// });
366    ///
367    /// monitor.write(|ready| *ready = true);
368    /// monitor.notify_one();
369    ///
370    /// assert_eq!(
371    ///     worker.join().expect("worker should finish"),
372    ///     WaitTimeoutResult::Ready(5),
373    /// );
374    /// ```
375    #[inline]
376    pub fn wait_timeout_until<R, P, F>(
377        &self,
378        timeout: Duration,
379        ready: P,
380        f: F,
381    ) -> WaitTimeoutResult<R>
382    where
383        P: FnMut(&T) -> bool,
384        F: FnOnce(&mut T) -> R,
385    {
386        self.inner.wait_timeout_until(timeout, ready, f)
387    }
388
389    /// Wakes one thread waiting on this monitor's condition variable.
390    ///
391    /// Notifications do not carry state by themselves. A waiting thread only
392    /// proceeds safely after rechecking the protected state. Call this after
393    /// changing state that may make one waiter able to continue.
394    #[inline]
395    pub fn notify_one(&self) {
396        self.inner.notify_one();
397    }
398
399    /// Wakes all threads waiting on this monitor's condition variable.
400    ///
401    /// Notifications do not carry state by themselves. Every awakened thread
402    /// must recheck the protected state before continuing. Call this after a
403    /// state change that may allow multiple waiters to make progress.
404    #[inline]
405    pub fn notify_all(&self) {
406        self.inner.notify_all();
407    }
408}
409
410impl<T: Default> Default for ArcMonitor<T> {
411    /// Creates an Arc-wrapped monitor containing `T::default()`.
412    ///
413    /// # Returns
414    ///
415    /// A cloneable monitor handle protecting the default value for `T`.
416    #[inline]
417    fn default() -> Self {
418        Self::new(T::default())
419    }
420}
421
422impl<T> Clone for ArcMonitor<T> {
423    /// Clones this monitor handle.
424    ///
425    /// The cloned handle shares the same protected state and condition
426    /// variable with the original.
427    ///
428    /// # Returns
429    ///
430    /// A new handle sharing the same monitor state.
431    #[inline]
432    fn clone(&self) -> Self {
433        Self {
434            inner: self.inner.clone(),
435        }
436    }
437}