Skip to main content

qubit_lock/monitor/
arc_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use super::{
23    Monitor,
24    MonitorGuard,
25    WaitTimeoutResult,
26    WaitTimeoutStatus,
27};
28
29/// Arc-wrapped monitor for shared condition-based state coordination.
30///
31/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
32/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
33/// It preserves the same guard-based waiting and predicate-based waiting
34/// semantics as [`Monitor`]. It implements [`Deref`] and [`AsRef`] so callers
35/// can pass it to APIs that expect a [`Monitor`] reference.
36///
37/// # Type Parameters
38///
39/// * `T` - The state protected by this monitor.
40///
41/// # Example
42///
43/// ```rust
44/// use std::thread;
45///
46/// use qubit_lock::lock::ArcMonitor;
47///
48/// let monitor = ArcMonitor::new(false);
49/// let waiter_monitor = monitor.clone();
50///
51/// let waiter = thread::spawn(move || {
52///     waiter_monitor.wait_until(
53///         |ready| *ready,
54///         |ready| {
55///             *ready = false;
56///         },
57///     );
58/// });
59///
60/// monitor.write(|ready| {
61///     *ready = true;
62/// });
63/// monitor.notify_all();
64///
65/// waiter.join().expect("waiter should finish");
66/// assert!(!monitor.read(|ready| *ready));
67/// ```
68///
69pub struct ArcMonitor<T> {
70    /// Shared monitor instance.
71    inner: Arc<Monitor<T>>,
72}
73
74impl<T> ArcMonitor<T> {
75    /// Creates an Arc-wrapped monitor protecting the supplied state value.
76    ///
77    /// # Arguments
78    ///
79    /// * `state` - Initial state protected by the monitor.
80    ///
81    /// # Returns
82    ///
83    /// A cloneable monitor handle initialized with the supplied state.
84    #[inline]
85    pub fn new(state: T) -> Self {
86        Self {
87            inner: Arc::new(Monitor::new(state)),
88        }
89    }
90
91    /// Acquires the shared monitor and returns a guard.
92    ///
93    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
94    /// keeps the monitor mutex locked until it is dropped. It can also wait on
95    /// the monitor's condition variable through [`MonitorGuard::wait`] or
96    /// [`MonitorGuard::wait_timeout`].
97    ///
98    /// # Returns
99    ///
100    /// A guard that provides read and write access to the protected state.
101    ///
102    /// # Example
103    ///
104    /// ```rust
105    /// use qubit_lock::lock::ArcMonitor;
106    ///
107    /// let monitor = ArcMonitor::new(1);
108    /// {
109    ///     let mut value = monitor.lock();
110    ///     *value += 1;
111    /// }
112    ///
113    /// assert_eq!(monitor.read(|value| *value), 2);
114    /// ```
115    #[inline]
116    pub fn lock(&self) -> MonitorGuard<'_, T> {
117        self.inner.lock()
118    }
119
120    /// Acquires the monitor and reads the protected state.
121    ///
122    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
123    /// mutex is held, so keep it short and avoid long blocking work.
124    ///
125    /// # Arguments
126    ///
127    /// * `f` - Closure that receives an immutable reference to the state.
128    ///
129    /// # Returns
130    ///
131    /// The value returned by `f`.
132    #[inline]
133    pub fn read<R, F>(&self, f: F) -> R
134    where
135        F: FnOnce(&T) -> R,
136    {
137        self.inner.read(f)
138    }
139
140    /// Acquires the monitor and mutates the protected state.
141    ///
142    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
143    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
144    /// waiting thread may observe.
145    ///
146    /// # Arguments
147    ///
148    /// * `f` - Closure that receives a mutable reference to the state.
149    ///
150    /// # Returns
151    ///
152    /// The value returned by `f`.
153    #[inline]
154    pub fn write<R, F>(&self, f: F) -> R
155    where
156        F: FnOnce(&mut T) -> R,
157    {
158        self.inner.write(f)
159    }
160
161    /// Waits for a notification or timeout without checking state.
162    ///
163    /// This delegates to [`Monitor::wait_notify`]. Most
164    /// coordination code should prefer [`Self::wait_while`],
165    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
166    ///
167    /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
168    /// but it does not prove that the protected state changed in a useful way.
169    ///
170    /// # Arguments
171    ///
172    /// * `timeout` - Maximum duration to wait for a notification.
173    ///
174    /// # Returns
175    ///
176    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
177    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
178    ///
179    /// # Example
180    ///
181    /// ```rust
182    /// use std::time::Duration;
183    ///
184    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
185    ///
186    /// let monitor = ArcMonitor::new(false);
187    /// let status = monitor.wait_notify(Duration::from_millis(1));
188    ///
189    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
190    /// ```
191    #[inline]
192    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
193        self.inner.wait_notify(timeout)
194    }
195
196    /// Waits while a predicate remains true, then mutates the protected state.
197    ///
198    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
199    /// while holding the monitor mutex, and the closure runs while the mutex is
200    /// still held after the predicate stops blocking.
201    ///
202    /// This method may block indefinitely if no thread changes the state so
203    /// that `waiting` becomes false and sends a notification.
204    ///
205    /// # Arguments
206    ///
207    /// * `waiting` - Predicate that returns `true` while the caller should
208    ///   keep waiting.
209    /// * `f` - Closure that receives mutable access after waiting is no longer
210    ///   required.
211    ///
212    /// # Returns
213    ///
214    /// The value returned by `f`.
215    ///
216    /// # Example
217    ///
218    /// ```rust
219    /// use std::thread;
220    ///
221    /// use qubit_lock::lock::ArcMonitor;
222    ///
223    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
224    /// let worker_monitor = monitor.clone();
225    ///
226    /// let worker = thread::spawn(move || {
227    ///     worker_monitor.wait_while(
228    ///         |items| items.is_empty(),
229    ///         |items| items.pop().expect("item should be ready"),
230    ///     )
231    /// });
232    ///
233    /// monitor.write(|items| items.push(7));
234    /// monitor.notify_one();
235    ///
236    /// assert_eq!(worker.join().expect("worker should finish"), 7);
237    /// ```
238    #[inline]
239    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
240    where
241        P: FnMut(&T) -> bool,
242        F: FnOnce(&mut T) -> R,
243    {
244        self.inner.wait_while(waiting, f)
245    }
246
247    /// Waits until the protected state satisfies a predicate, then mutates it.
248    ///
249    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
250    /// no thread changes the state to satisfy the predicate and sends a
251    /// notification.
252    ///
253    /// # Arguments
254    ///
255    /// * `ready` - Predicate that returns `true` when the state is ready.
256    /// * `f` - Closure that receives mutable access to the ready state.
257    ///
258    /// # Returns
259    ///
260    /// The value returned by `f`.
261    #[inline]
262    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
263    where
264        P: FnMut(&T) -> bool,
265        F: FnOnce(&mut T) -> R,
266    {
267        self.inner.wait_until(ready, f)
268    }
269
270    /// Waits while a predicate remains true, with an overall time limit.
271    ///
272    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
273    /// false before `timeout` expires, `f` runs while the monitor lock is still
274    /// held. If the timeout expires first, the closure is not called.
275    ///
276    /// # Arguments
277    ///
278    /// * `timeout` - Maximum total duration to wait.
279    /// * `waiting` - Predicate that returns `true` while the caller should
280    ///   continue waiting.
281    /// * `f` - Closure that receives mutable access when waiting is no longer
282    ///   required.
283    ///
284    /// # Returns
285    ///
286    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
287    /// predicate stops blocking before the timeout. Returns
288    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
289    ///
290    /// # Example
291    ///
292    /// ```rust
293    /// use std::time::Duration;
294    ///
295    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
296    ///
297    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
298    /// let result = monitor.wait_timeout_while(
299    ///     Duration::from_millis(1),
300    ///     |items| items.is_empty(),
301    ///     |items| items.pop(),
302    /// );
303    ///
304    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
305    /// ```
306    #[inline]
307    pub fn wait_timeout_while<R, P, F>(
308        &self,
309        timeout: Duration,
310        waiting: P,
311        f: F,
312    ) -> WaitTimeoutResult<R>
313    where
314        P: FnMut(&T) -> bool,
315        F: FnOnce(&mut T) -> R,
316    {
317        self.inner.wait_timeout_while(timeout, waiting, f)
318    }
319
320    /// Waits until a predicate becomes true, with an overall time limit.
321    ///
322    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
323    /// true before `timeout` expires, `f` runs while the monitor lock is still
324    /// held. If the timeout expires first, the closure is not called.
325    ///
326    /// # Arguments
327    ///
328    /// * `timeout` - Maximum total duration to wait.
329    /// * `ready` - Predicate that returns `true` when the caller may continue.
330    /// * `f` - Closure that receives mutable access to the ready state.
331    ///
332    /// # Returns
333    ///
334    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
335    /// predicate becomes true before the timeout. Returns
336    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
337    ///
338    /// # Example
339    ///
340    /// ```rust
341    /// use std::{
342    ///     thread,
343    ///     time::Duration,
344    /// };
345    ///
346    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
347    ///
348    /// let monitor = ArcMonitor::new(false);
349    /// let worker_monitor = monitor.clone();
350    ///
351    /// let worker = thread::spawn(move || {
352    ///     worker_monitor.wait_timeout_until(
353    ///         Duration::from_secs(1),
354    ///         |ready| *ready,
355    ///         |ready| {
356    ///             *ready = false;
357    ///             5
358    ///         },
359    ///     )
360    /// });
361    ///
362    /// monitor.write(|ready| *ready = true);
363    /// monitor.notify_one();
364    ///
365    /// assert_eq!(
366    ///     worker.join().expect("worker should finish"),
367    ///     WaitTimeoutResult::Ready(5),
368    /// );
369    /// ```
370    #[inline]
371    pub fn wait_timeout_until<R, P, F>(
372        &self,
373        timeout: Duration,
374        ready: P,
375        f: F,
376    ) -> WaitTimeoutResult<R>
377    where
378        P: FnMut(&T) -> bool,
379        F: FnOnce(&mut T) -> R,
380    {
381        self.inner.wait_timeout_until(timeout, ready, f)
382    }
383
384    /// Wakes one thread waiting on this monitor's condition variable.
385    ///
386    /// Notifications do not carry state by themselves. A waiting thread only
387    /// proceeds safely after rechecking the protected state. Call this after
388    /// changing state that may make one waiter able to continue.
389    #[inline]
390    pub fn notify_one(&self) {
391        self.inner.notify_one();
392    }
393
394    /// Wakes all threads waiting on this monitor's condition variable.
395    ///
396    /// Notifications do not carry state by themselves. Every awakened thread
397    /// must recheck the protected state before continuing. Call this after a
398    /// state change that may allow multiple waiters to make progress.
399    #[inline]
400    pub fn notify_all(&self) {
401        self.inner.notify_all();
402    }
403}
404
405impl<T> AsRef<Monitor<T>> for ArcMonitor<T> {
406    /// Returns a reference to the underlying monitor.
407    ///
408    /// This is useful when callers need an explicit [`Monitor`] reference while
409    /// keeping the cloneable [`ArcMonitor`] handle.
410    #[inline]
411    fn as_ref(&self) -> &Monitor<T> {
412        self.inner.as_ref()
413    }
414}
415
416impl<T> Deref for ArcMonitor<T> {
417    type Target = Monitor<T>;
418
419    /// Dereferences this wrapper to the underlying monitor.
420    ///
421    /// Method-call dereferencing lets callers use native [`Monitor`] APIs
422    /// directly, while this wrapper still provides cloneable ownership.
423    #[inline]
424    fn deref(&self) -> &Self::Target {
425        self.inner.as_ref()
426    }
427}
428
429impl<T: Default> Default for ArcMonitor<T> {
430    /// Creates an Arc-wrapped monitor containing `T::default()`.
431    ///
432    /// # Returns
433    ///
434    /// A cloneable monitor handle protecting the default value for `T`.
435    #[inline]
436    fn default() -> Self {
437        Self::new(T::default())
438    }
439}
440
441impl<T> Clone for ArcMonitor<T> {
442    /// Clones this monitor handle.
443    ///
444    /// The cloned handle shares the same protected state and condition
445    /// variable with the original.
446    ///
447    /// # Returns
448    ///
449    /// A new handle sharing the same monitor state.
450    #[inline]
451    fn clone(&self) -> Self {
452        Self {
453            inner: self.inner.clone(),
454        }
455    }
456}