Skip to main content

qubit_lock/monitor/
arc_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use super::{
20    Monitor,
21    MonitorGuard,
22    WaitTimeoutResult,
23    WaitTimeoutStatus,
24};
25
26/// Arc-wrapped monitor for shared condition-based state coordination.
27///
28/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
29/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
30/// It preserves the same guard-based waiting, predicate-based waiting, and
31/// poison recovery semantics as [`Monitor`].
32///
33/// # Type Parameters
34///
35/// * `T` - The state protected by this monitor.
36///
37/// # Example
38///
39/// ```rust
40/// use std::thread;
41///
42/// use qubit_lock::lock::ArcMonitor;
43///
44/// let monitor = ArcMonitor::new(false);
45/// let waiter_monitor = monitor.clone();
46///
47/// let waiter = thread::spawn(move || {
48///     waiter_monitor.wait_until(
49///         |ready| *ready,
50///         |ready| {
51///             *ready = false;
52///         },
53///     );
54/// });
55///
56/// monitor.write(|ready| {
57///     *ready = true;
58/// });
59/// monitor.notify_all();
60///
61/// waiter.join().expect("waiter should finish");
62/// assert!(!monitor.read(|ready| *ready));
63/// ```
64///
65pub struct ArcMonitor<T> {
66    /// Shared monitor instance.
67    inner: Arc<Monitor<T>>,
68}
69
70impl<T> ArcMonitor<T> {
71    /// Creates an Arc-wrapped monitor protecting the supplied state value.
72    ///
73    /// # Arguments
74    ///
75    /// * `state` - Initial state protected by the monitor.
76    ///
77    /// # Returns
78    ///
79    /// A cloneable monitor handle initialized with the supplied state.
80    #[inline]
81    pub fn new(state: T) -> Self {
82        Self {
83            inner: Arc::new(Monitor::new(state)),
84        }
85    }
86
87    /// Acquires the shared monitor and returns a guard.
88    ///
89    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
90    /// keeps the monitor mutex locked until it is dropped. It can also wait on
91    /// the monitor's condition variable through [`MonitorGuard::wait`] or
92    /// [`MonitorGuard::wait_timeout`].
93    ///
94    /// If the underlying mutex is poisoned, this method recovers the inner
95    /// state and still returns a guard.
96    ///
97    /// # Returns
98    ///
99    /// A guard that provides read and write access to the protected state.
100    ///
101    /// # Example
102    ///
103    /// ```rust
104    /// use qubit_lock::lock::ArcMonitor;
105    ///
106    /// let monitor = ArcMonitor::new(1);
107    /// {
108    ///     let mut value = monitor.lock();
109    ///     *value += 1;
110    /// }
111    ///
112    /// assert_eq!(monitor.read(|value| *value), 2);
113    /// ```
114    #[inline]
115    pub fn lock(&self) -> MonitorGuard<'_, T> {
116        self.inner.lock()
117    }
118
119    /// Acquires the monitor and reads the protected state.
120    ///
121    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
122    /// mutex is held, so keep it short and avoid long blocking work.
123    ///
124    /// # Arguments
125    ///
126    /// * `f` - Closure that receives an immutable reference to the state.
127    ///
128    /// # Returns
129    ///
130    /// The value returned by `f`.
131    #[inline]
132    pub fn read<R, F>(&self, f: F) -> R
133    where
134        F: FnOnce(&T) -> R,
135    {
136        self.inner.read(f)
137    }
138
139    /// Acquires the monitor and mutates the protected state.
140    ///
141    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
142    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
143    /// waiting thread may observe.
144    ///
145    /// # Arguments
146    ///
147    /// * `f` - Closure that receives a mutable reference to the state.
148    ///
149    /// # Returns
150    ///
151    /// The value returned by `f`.
152    #[inline]
153    pub fn write<R, F>(&self, f: F) -> R
154    where
155        F: FnOnce(&mut T) -> R,
156    {
157        self.inner.write(f)
158    }
159
160    /// Waits for a notification or timeout without checking state.
161    ///
162    /// This delegates to [`Monitor::wait_notify`]. Most
163    /// coordination code should prefer [`Self::wait_while`],
164    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
165    ///
166    /// Condition variables may wake spuriously, so
167    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
168    /// state.
169    ///
170    /// # Arguments
171    ///
172    /// * `timeout` - Maximum duration to wait for a notification.
173    ///
174    /// # Returns
175    ///
176    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
177    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
178    ///
179    /// # Example
180    ///
181    /// ```rust
182    /// use std::time::Duration;
183    ///
184    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
185    ///
186    /// let monitor = ArcMonitor::new(false);
187    /// let status = monitor.wait_notify(Duration::from_millis(1));
188    ///
189    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
190    /// ```
191    #[inline]
192    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
193        self.inner.wait_notify(timeout)
194    }
195
196    /// Waits while a predicate remains true, then mutates the protected state.
197    ///
198    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
199    /// while holding the monitor mutex, and the closure runs while the mutex is
200    /// still held after the predicate stops blocking.
201    ///
202    /// This method may block indefinitely if no thread changes the state so
203    /// that `waiting` becomes false and sends a notification.
204    ///
205    /// # Arguments
206    ///
207    /// * `waiting` - Predicate that returns `true` while the caller should
208    ///   keep waiting.
209    /// * `f` - Closure that receives mutable access after waiting is no longer
210    ///   required.
211    ///
212    /// # Returns
213    ///
214    /// The value returned by `f`.
215    ///
216    /// # Example
217    ///
218    /// ```rust
219    /// use std::thread;
220    ///
221    /// use qubit_lock::lock::ArcMonitor;
222    ///
223    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
224    /// let worker_monitor = monitor.clone();
225    ///
226    /// let worker = thread::spawn(move || {
227    ///     worker_monitor.wait_while(
228    ///         |items| items.is_empty(),
229    ///         |items| items.pop().expect("item should be ready"),
230    ///     )
231    /// });
232    ///
233    /// monitor.write(|items| items.push(7));
234    /// monitor.notify_one();
235    ///
236    /// assert_eq!(worker.join().expect("worker should finish"), 7);
237    /// ```
238    #[inline]
239    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
240    where
241        P: FnMut(&T) -> bool,
242        F: FnOnce(&mut T) -> R,
243    {
244        self.inner.wait_while(waiting, f)
245    }
246
247    /// Waits until the protected state satisfies a predicate, then mutates it.
248    ///
249    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
250    /// no thread changes the state to satisfy the predicate and sends a
251    /// notification.
252    ///
253    /// # Arguments
254    ///
255    /// * `ready` - Predicate that returns `true` when the state is ready.
256    /// * `f` - Closure that receives mutable access to the ready state.
257    ///
258    /// # Returns
259    ///
260    /// The value returned by `f`.
261    #[inline]
262    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
263    where
264        P: FnMut(&T) -> bool,
265        F: FnOnce(&mut T) -> R,
266    {
267        self.inner.wait_until(ready, f)
268    }
269
270    /// Waits while a predicate remains true, with an overall time limit.
271    ///
272    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
273    /// false before `timeout` expires, `f` runs while the monitor lock is still
274    /// held. If the timeout expires first, the closure is not called.
275    ///
276    /// # Arguments
277    ///
278    /// * `timeout` - Maximum total duration to wait.
279    /// * `waiting` - Predicate that returns `true` while the caller should
280    ///   continue waiting.
281    /// * `f` - Closure that receives mutable access when waiting is no longer
282    ///   required.
283    ///
284    /// # Returns
285    ///
286    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
287    /// predicate stops blocking before the timeout. Returns
288    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
289    ///
290    /// # Example
291    ///
292    /// ```rust
293    /// use std::time::Duration;
294    ///
295    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
296    ///
297    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
298    /// let result = monitor.wait_timeout_while(
299    ///     Duration::from_millis(1),
300    ///     |items| items.is_empty(),
301    ///     |items| items.pop(),
302    /// );
303    ///
304    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
305    /// ```
306    #[inline]
307    pub fn wait_timeout_while<R, P, F>(
308        &self,
309        timeout: Duration,
310        waiting: P,
311        f: F,
312    ) -> WaitTimeoutResult<R>
313    where
314        P: FnMut(&T) -> bool,
315        F: FnOnce(&mut T) -> R,
316    {
317        self.inner.wait_timeout_while(timeout, waiting, f)
318    }
319
320    /// Waits until a predicate becomes true, with an overall time limit.
321    ///
322    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
323    /// true before `timeout` expires, `f` runs while the monitor lock is still
324    /// held. If the timeout expires first, the closure is not called.
325    ///
326    /// # Arguments
327    ///
328    /// * `timeout` - Maximum total duration to wait.
329    /// * `ready` - Predicate that returns `true` when the caller may continue.
330    /// * `f` - Closure that receives mutable access to the ready state.
331    ///
332    /// # Returns
333    ///
334    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
335    /// predicate becomes true before the timeout. Returns
336    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
337    ///
338    /// # Example
339    ///
340    /// ```rust
341    /// use std::{
342    ///     thread,
343    ///     time::Duration,
344    /// };
345    ///
346    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
347    ///
348    /// let monitor = ArcMonitor::new(false);
349    /// let worker_monitor = monitor.clone();
350    ///
351    /// let worker = thread::spawn(move || {
352    ///     worker_monitor.wait_timeout_until(
353    ///         Duration::from_secs(1),
354    ///         |ready| *ready,
355    ///         |ready| {
356    ///             *ready = false;
357    ///             5
358    ///         },
359    ///     )
360    /// });
361    ///
362    /// monitor.write(|ready| *ready = true);
363    /// monitor.notify_one();
364    ///
365    /// assert_eq!(
366    ///     worker.join().expect("worker should finish"),
367    ///     WaitTimeoutResult::Ready(5),
368    /// );
369    /// ```
370    #[inline]
371    pub fn wait_timeout_until<R, P, F>(
372        &self,
373        timeout: Duration,
374        ready: P,
375        f: F,
376    ) -> WaitTimeoutResult<R>
377    where
378        P: FnMut(&T) -> bool,
379        F: FnOnce(&mut T) -> R,
380    {
381        self.inner.wait_timeout_until(timeout, ready, f)
382    }
383
384    /// Wakes one thread waiting on this monitor's condition variable.
385    ///
386    /// Notifications do not carry state by themselves. A waiting thread only
387    /// proceeds safely after rechecking the protected state. Call this after
388    /// changing state that may make one waiter able to continue.
389    #[inline]
390    pub fn notify_one(&self) {
391        self.inner.notify_one();
392    }
393
394    /// Wakes all threads waiting on this monitor's condition variable.
395    ///
396    /// Notifications do not carry state by themselves. Every awakened thread
397    /// must recheck the protected state before continuing. Call this after a
398    /// state change that may allow multiple waiters to make progress.
399    #[inline]
400    pub fn notify_all(&self) {
401        self.inner.notify_all();
402    }
403}
404
405impl<T: Default> Default for ArcMonitor<T> {
406    /// Creates an Arc-wrapped monitor containing `T::default()`.
407    ///
408    /// # Returns
409    ///
410    /// A cloneable monitor handle protecting the default value for `T`.
411    #[inline]
412    fn default() -> Self {
413        Self::new(T::default())
414    }
415}
416
417impl<T> Clone for ArcMonitor<T> {
418    /// Clones this monitor handle.
419    ///
420    /// The cloned handle shares the same protected state and condition
421    /// variable with the original.
422    ///
423    /// # Returns
424    ///
425    /// A new handle sharing the same monitor state.
426    #[inline]
427    fn clone(&self) -> Self {
428        Self {
429            inner: self.inner.clone(),
430        }
431    }
432}