Skip to main content

qubit_lock/monitor/
arc_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use super::{
20    Monitor,
21    MonitorGuard,
22    WaitTimeoutResult,
23    WaitTimeoutStatus,
24};
25
26/// Arc-wrapped monitor for shared condition-based state coordination.
27///
28/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
29/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
30/// It preserves the same guard-based waiting and predicate-based waiting
31/// semantics as [`Monitor`].
32///
33/// # Type Parameters
34///
35/// * `T` - The state protected by this monitor.
36///
37/// # Example
38///
39/// ```rust
40/// use std::thread;
41///
42/// use qubit_lock::lock::ArcMonitor;
43///
44/// let monitor = ArcMonitor::new(false);
45/// let waiter_monitor = monitor.clone();
46///
47/// let waiter = thread::spawn(move || {
48///     waiter_monitor.wait_until(
49///         |ready| *ready,
50///         |ready| {
51///             *ready = false;
52///         },
53///     );
54/// });
55///
56/// monitor.write(|ready| {
57///     *ready = true;
58/// });
59/// monitor.notify_all();
60///
61/// waiter.join().expect("waiter should finish");
62/// assert!(!monitor.read(|ready| *ready));
63/// ```
64///
65pub struct ArcMonitor<T> {
66    /// Shared monitor instance.
67    inner: Arc<Monitor<T>>,
68}
69
70impl<T> ArcMonitor<T> {
71    /// Creates an Arc-wrapped monitor protecting the supplied state value.
72    ///
73    /// # Arguments
74    ///
75    /// * `state` - Initial state protected by the monitor.
76    ///
77    /// # Returns
78    ///
79    /// A cloneable monitor handle initialized with the supplied state.
80    #[inline]
81    pub fn new(state: T) -> Self {
82        Self {
83            inner: Arc::new(Monitor::new(state)),
84        }
85    }
86
87    /// Acquires the shared monitor and returns a guard.
88    ///
89    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
90    /// keeps the monitor mutex locked until it is dropped. It can also wait on
91    /// the monitor's condition variable through [`MonitorGuard::wait`] or
92    /// [`MonitorGuard::wait_timeout`].
93    ///
94    /// # Returns
95    ///
96    /// A guard that provides read and write access to the protected state.
97    ///
98    /// # Example
99    ///
100    /// ```rust
101    /// use qubit_lock::lock::ArcMonitor;
102    ///
103    /// let monitor = ArcMonitor::new(1);
104    /// {
105    ///     let mut value = monitor.lock();
106    ///     *value += 1;
107    /// }
108    ///
109    /// assert_eq!(monitor.read(|value| *value), 2);
110    /// ```
111    #[inline]
112    pub fn lock(&self) -> MonitorGuard<'_, T> {
113        self.inner.lock()
114    }
115
116    /// Acquires the monitor and reads the protected state.
117    ///
118    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
119    /// mutex is held, so keep it short and avoid long blocking work.
120    ///
121    /// # Arguments
122    ///
123    /// * `f` - Closure that receives an immutable reference to the state.
124    ///
125    /// # Returns
126    ///
127    /// The value returned by `f`.
128    #[inline]
129    pub fn read<R, F>(&self, f: F) -> R
130    where
131        F: FnOnce(&T) -> R,
132    {
133        self.inner.read(f)
134    }
135
136    /// Acquires the monitor and mutates the protected state.
137    ///
138    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
139    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
140    /// waiting thread may observe.
141    ///
142    /// # Arguments
143    ///
144    /// * `f` - Closure that receives a mutable reference to the state.
145    ///
146    /// # Returns
147    ///
148    /// The value returned by `f`.
149    #[inline]
150    pub fn write<R, F>(&self, f: F) -> R
151    where
152        F: FnOnce(&mut T) -> R,
153    {
154        self.inner.write(f)
155    }
156
157    /// Waits for a notification or timeout without checking state.
158    ///
159    /// This delegates to [`Monitor::wait_notify`]. Most
160    /// coordination code should prefer [`Self::wait_while`],
161    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
162    ///
163    /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
164    /// but it does not prove that the protected state changed in a useful way.
165    ///
166    /// # Arguments
167    ///
168    /// * `timeout` - Maximum duration to wait for a notification.
169    ///
170    /// # Returns
171    ///
172    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
173    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
174    ///
175    /// # Example
176    ///
177    /// ```rust
178    /// use std::time::Duration;
179    ///
180    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
181    ///
182    /// let monitor = ArcMonitor::new(false);
183    /// let status = monitor.wait_notify(Duration::from_millis(1));
184    ///
185    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
186    /// ```
187    #[inline]
188    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
189        self.inner.wait_notify(timeout)
190    }
191
192    /// Waits while a predicate remains true, then mutates the protected state.
193    ///
194    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
195    /// while holding the monitor mutex, and the closure runs while the mutex is
196    /// still held after the predicate stops blocking.
197    ///
198    /// This method may block indefinitely if no thread changes the state so
199    /// that `waiting` becomes false and sends a notification.
200    ///
201    /// # Arguments
202    ///
203    /// * `waiting` - Predicate that returns `true` while the caller should
204    ///   keep waiting.
205    /// * `f` - Closure that receives mutable access after waiting is no longer
206    ///   required.
207    ///
208    /// # Returns
209    ///
210    /// The value returned by `f`.
211    ///
212    /// # Example
213    ///
214    /// ```rust
215    /// use std::thread;
216    ///
217    /// use qubit_lock::lock::ArcMonitor;
218    ///
219    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
220    /// let worker_monitor = monitor.clone();
221    ///
222    /// let worker = thread::spawn(move || {
223    ///     worker_monitor.wait_while(
224    ///         |items| items.is_empty(),
225    ///         |items| items.pop().expect("item should be ready"),
226    ///     )
227    /// });
228    ///
229    /// monitor.write(|items| items.push(7));
230    /// monitor.notify_one();
231    ///
232    /// assert_eq!(worker.join().expect("worker should finish"), 7);
233    /// ```
234    #[inline]
235    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
236    where
237        P: FnMut(&T) -> bool,
238        F: FnOnce(&mut T) -> R,
239    {
240        self.inner.wait_while(waiting, f)
241    }
242
243    /// Waits until the protected state satisfies a predicate, then mutates it.
244    ///
245    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
246    /// no thread changes the state to satisfy the predicate and sends a
247    /// notification.
248    ///
249    /// # Arguments
250    ///
251    /// * `ready` - Predicate that returns `true` when the state is ready.
252    /// * `f` - Closure that receives mutable access to the ready state.
253    ///
254    /// # Returns
255    ///
256    /// The value returned by `f`.
257    #[inline]
258    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
259    where
260        P: FnMut(&T) -> bool,
261        F: FnOnce(&mut T) -> R,
262    {
263        self.inner.wait_until(ready, f)
264    }
265
266    /// Waits while a predicate remains true, with an overall time limit.
267    ///
268    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
269    /// false before `timeout` expires, `f` runs while the monitor lock is still
270    /// held. If the timeout expires first, the closure is not called.
271    ///
272    /// # Arguments
273    ///
274    /// * `timeout` - Maximum total duration to wait.
275    /// * `waiting` - Predicate that returns `true` while the caller should
276    ///   continue waiting.
277    /// * `f` - Closure that receives mutable access when waiting is no longer
278    ///   required.
279    ///
280    /// # Returns
281    ///
282    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
283    /// predicate stops blocking before the timeout. Returns
284    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
285    ///
286    /// # Example
287    ///
288    /// ```rust
289    /// use std::time::Duration;
290    ///
291    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
292    ///
293    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
294    /// let result = monitor.wait_timeout_while(
295    ///     Duration::from_millis(1),
296    ///     |items| items.is_empty(),
297    ///     |items| items.pop(),
298    /// );
299    ///
300    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
301    /// ```
302    #[inline]
303    pub fn wait_timeout_while<R, P, F>(
304        &self,
305        timeout: Duration,
306        waiting: P,
307        f: F,
308    ) -> WaitTimeoutResult<R>
309    where
310        P: FnMut(&T) -> bool,
311        F: FnOnce(&mut T) -> R,
312    {
313        self.inner.wait_timeout_while(timeout, waiting, f)
314    }
315
316    /// Waits until a predicate becomes true, with an overall time limit.
317    ///
318    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
319    /// true before `timeout` expires, `f` runs while the monitor lock is still
320    /// held. If the timeout expires first, the closure is not called.
321    ///
322    /// # Arguments
323    ///
324    /// * `timeout` - Maximum total duration to wait.
325    /// * `ready` - Predicate that returns `true` when the caller may continue.
326    /// * `f` - Closure that receives mutable access to the ready state.
327    ///
328    /// # Returns
329    ///
330    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
331    /// predicate becomes true before the timeout. Returns
332    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
333    ///
334    /// # Example
335    ///
336    /// ```rust
337    /// use std::{
338    ///     thread,
339    ///     time::Duration,
340    /// };
341    ///
342    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
343    ///
344    /// let monitor = ArcMonitor::new(false);
345    /// let worker_monitor = monitor.clone();
346    ///
347    /// let worker = thread::spawn(move || {
348    ///     worker_monitor.wait_timeout_until(
349    ///         Duration::from_secs(1),
350    ///         |ready| *ready,
351    ///         |ready| {
352    ///             *ready = false;
353    ///             5
354    ///         },
355    ///     )
356    /// });
357    ///
358    /// monitor.write(|ready| *ready = true);
359    /// monitor.notify_one();
360    ///
361    /// assert_eq!(
362    ///     worker.join().expect("worker should finish"),
363    ///     WaitTimeoutResult::Ready(5),
364    /// );
365    /// ```
366    #[inline]
367    pub fn wait_timeout_until<R, P, F>(
368        &self,
369        timeout: Duration,
370        ready: P,
371        f: F,
372    ) -> WaitTimeoutResult<R>
373    where
374        P: FnMut(&T) -> bool,
375        F: FnOnce(&mut T) -> R,
376    {
377        self.inner.wait_timeout_until(timeout, ready, f)
378    }
379
380    /// Wakes one thread waiting on this monitor's condition variable.
381    ///
382    /// Notifications do not carry state by themselves. A waiting thread only
383    /// proceeds safely after rechecking the protected state. Call this after
384    /// changing state that may make one waiter able to continue.
385    #[inline]
386    pub fn notify_one(&self) {
387        self.inner.notify_one();
388    }
389
390    /// Wakes all threads waiting on this monitor's condition variable.
391    ///
392    /// Notifications do not carry state by themselves. Every awakened thread
393    /// must recheck the protected state before continuing. Call this after a
394    /// state change that may allow multiple waiters to make progress.
395    #[inline]
396    pub fn notify_all(&self) {
397        self.inner.notify_all();
398    }
399}
400
401impl<T: Default> Default for ArcMonitor<T> {
402    /// Creates an Arc-wrapped monitor containing `T::default()`.
403    ///
404    /// # Returns
405    ///
406    /// A cloneable monitor handle protecting the default value for `T`.
407    #[inline]
408    fn default() -> Self {
409        Self::new(T::default())
410    }
411}
412
413impl<T> Clone for ArcMonitor<T> {
414    /// Clones this monitor handle.
415    ///
416    /// The cloned handle shares the same protected state and condition
417    /// variable with the original.
418    ///
419    /// # Returns
420    ///
421    /// A new handle sharing the same monitor state.
422    #[inline]
423    fn clone(&self) -> Self {
424        Self {
425            inner: self.inner.clone(),
426        }
427    }
428}