Skip to main content

qubit_lock/monitor/
arc_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use super::{
23    Monitor,
24    MonitorGuard,
25    WaitTimeoutResult,
26    WaitTimeoutStatus,
27};
28
29/// Arc-wrapped monitor for shared condition-based state coordination.
30///
31/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
32/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
33/// It preserves the same guard-based waiting and predicate-based waiting
34/// semantics as [`Monitor`]. It implements [`Deref`] and [`AsRef`] so callers
35/// can pass it to APIs that expect a [`Monitor`] reference.
36///
37/// # Type Parameters
38///
39/// * `T` - The state protected by this monitor.
40///
41/// # Example
42///
43/// ```rust
44/// use std::thread;
45///
46/// use qubit_lock::ArcMonitor;
47///
48/// let monitor = ArcMonitor::new(false);
49/// let waiter_monitor = monitor.clone();
50///
51/// let waiter = thread::spawn(move || {
52///     waiter_monitor.wait_until(
53///         |ready| *ready,
54///         |ready| {
55///             *ready = false;
56///         },
57///     );
58/// });
59///
60/// monitor.write(|ready| {
61///     *ready = true;
62/// });
63/// monitor.notify_all();
64///
65/// waiter.join().expect("waiter should finish");
66/// assert!(!monitor.read(|ready| *ready));
67/// ```
68///
69pub struct ArcMonitor<T> {
70    /// Shared monitor instance.
71    inner: Arc<Monitor<T>>,
72}
73
74impl<T> ArcMonitor<T> {
75    /// Creates an Arc-wrapped monitor protecting the supplied state value.
76    ///
77    /// # Arguments
78    ///
79    /// * `state` - Initial state protected by the monitor.
80    ///
81    /// # Returns
82    ///
83    /// A cloneable monitor handle initialized with the supplied state.
84    #[inline]
85    pub fn new(state: T) -> Self {
86        Self {
87            inner: Arc::new(Monitor::new(state)),
88        }
89    }
90
91    /// Acquires the shared monitor and returns a guard.
92    ///
93    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
94    /// keeps the monitor mutex locked until it is dropped. It can also wait on
95    /// the monitor's condition variable through [`MonitorGuard::wait`] or
96    /// [`MonitorGuard::wait_timeout`].
97    ///
98    /// # Returns
99    ///
100    /// A guard that provides read and write access to the protected state.
101    ///
102    /// # Example
103    ///
104    /// ```rust
105    /// use qubit_lock::ArcMonitor;
106    ///
107    /// let monitor = ArcMonitor::new(1);
108    /// {
109    ///     let mut value = monitor.lock();
110    ///     *value += 1;
111    /// }
112    ///
113    /// assert_eq!(monitor.read(|value| *value), 2);
114    /// ```
115    #[inline]
116    pub fn lock(&self) -> MonitorGuard<'_, T> {
117        self.inner.lock()
118    }
119
120    /// Acquires the monitor and reads the protected state.
121    ///
122    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
123    /// mutex is held, so keep it short and avoid long blocking work.
124    ///
125    /// # Arguments
126    ///
127    /// * `f` - Closure that receives an immutable reference to the state.
128    ///
129    /// # Returns
130    ///
131    /// The value returned by `f`.
132    #[inline]
133    pub fn read<R, F>(&self, f: F) -> R
134    where
135        F: FnOnce(&T) -> R,
136    {
137        self.inner.read(f)
138    }
139
140    /// Acquires the monitor and mutates the protected state.
141    ///
142    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
143    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
144    /// waiting thread may observe.
145    ///
146    /// # Arguments
147    ///
148    /// * `f` - Closure that receives a mutable reference to the state.
149    ///
150    /// # Returns
151    ///
152    /// The value returned by `f`.
153    #[inline]
154    pub fn write<R, F>(&self, f: F) -> R
155    where
156        F: FnOnce(&mut T) -> R,
157    {
158        self.inner.write(f)
159    }
160
161    /// Mutates the protected state and wakes one waiter.
162    ///
163    /// This delegates to [`Monitor::write_notify_one`]. The closure runs while
164    /// the monitor mutex is held; after it returns, the lock is released and one
165    /// waiter is notified. If `f` panics, the panic is propagated and no
166    /// notification is sent.
167    ///
168    /// # Arguments
169    ///
170    /// * `f` - Closure that receives a mutable reference to the state.
171    ///
172    /// # Returns
173    ///
174    /// The value returned by `f`.
175    #[inline]
176    pub fn write_notify_one<R, F>(&self, f: F) -> R
177    where
178        F: FnOnce(&mut T) -> R,
179    {
180        self.inner.write_notify_one(f)
181    }
182
183    /// Mutates the protected state and wakes all waiters.
184    ///
185    /// This delegates to [`Monitor::write_notify_all`]. The closure runs while
186    /// the monitor mutex is held; after it returns, the lock is released and all
187    /// waiters are notified. If `f` panics, the panic is propagated and no
188    /// notification is sent.
189    ///
190    /// # Arguments
191    ///
192    /// * `f` - Closure that receives a mutable reference to the state.
193    ///
194    /// # Returns
195    ///
196    /// The value returned by `f`.
197    #[inline]
198    pub fn write_notify_all<R, F>(&self, f: F) -> R
199    where
200        F: FnOnce(&mut T) -> R,
201    {
202        self.inner.write_notify_all(f)
203    }
204
205    /// Waits for a notification or timeout without checking state.
206    ///
207    /// This delegates to [`Monitor::wait_notify`]. Most
208    /// coordination code should prefer [`Self::wait_while`],
209    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
210    ///
211    /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
212    /// but it does not prove that the protected state changed in a useful way.
213    ///
214    /// # Arguments
215    ///
216    /// * `timeout` - Maximum duration to wait for a notification.
217    ///
218    /// # Returns
219    ///
220    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
221    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
222    ///
223    /// # Example
224    ///
225    /// ```rust
226    /// use std::time::Duration;
227    ///
228    /// use qubit_lock::{ArcMonitor, WaitTimeoutStatus};
229    ///
230    /// let monitor = ArcMonitor::new(false);
231    /// let status = monitor.wait_notify(Duration::from_millis(1));
232    ///
233    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
234    /// ```
235    #[inline]
236    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
237        self.inner.wait_notify(timeout)
238    }
239
240    /// Waits while a predicate remains true, then mutates the protected state.
241    ///
242    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
243    /// while holding the monitor mutex, and the closure runs while the mutex is
244    /// still held after the predicate stops blocking.
245    ///
246    /// This method may block indefinitely if no thread changes the state so
247    /// that `waiting` becomes false and sends a notification.
248    ///
249    /// # Arguments
250    ///
251    /// * `waiting` - Predicate that returns `true` while the caller should
252    ///   keep waiting.
253    /// * `f` - Closure that receives mutable access after waiting is no longer
254    ///   required.
255    ///
256    /// # Returns
257    ///
258    /// The value returned by `f`.
259    ///
260    /// # Example
261    ///
262    /// ```rust
263    /// use std::thread;
264    ///
265    /// use qubit_lock::ArcMonitor;
266    ///
267    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
268    /// let worker_monitor = monitor.clone();
269    ///
270    /// let worker = thread::spawn(move || {
271    ///     worker_monitor.wait_while(
272    ///         |items| items.is_empty(),
273    ///         |items| items.pop().expect("item should be ready"),
274    ///     )
275    /// });
276    ///
277    /// monitor.write(|items| items.push(7));
278    /// monitor.notify_one();
279    ///
280    /// assert_eq!(worker.join().expect("worker should finish"), 7);
281    /// ```
282    #[inline]
283    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
284    where
285        P: FnMut(&T) -> bool,
286        F: FnOnce(&mut T) -> R,
287    {
288        self.inner.wait_while(waiting, f)
289    }
290
291    /// Waits until the protected state satisfies a predicate, then mutates it.
292    ///
293    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
294    /// no thread changes the state to satisfy the predicate and sends a
295    /// notification.
296    ///
297    /// # Arguments
298    ///
299    /// * `ready` - Predicate that returns `true` when the state is ready.
300    /// * `f` - Closure that receives mutable access to the ready state.
301    ///
302    /// # Returns
303    ///
304    /// The value returned by `f`.
305    #[inline]
306    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
307    where
308        P: FnMut(&T) -> bool,
309        F: FnOnce(&mut T) -> R,
310    {
311        self.inner.wait_until(ready, f)
312    }
313
314    /// Waits while a predicate remains true, with an overall time limit.
315    ///
316    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
317    /// false before `timeout` expires, `f` runs while the monitor lock is still
318    /// held. If the timeout expires first, the closure is not called.
319    ///
320    /// # Arguments
321    ///
322    /// * `timeout` - Maximum total duration to wait.
323    /// * `waiting` - Predicate that returns `true` while the caller should
324    ///   continue waiting.
325    /// * `f` - Closure that receives mutable access when waiting is no longer
326    ///   required.
327    ///
328    /// # Returns
329    ///
330    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
331    /// predicate stops blocking before the timeout. Returns
332    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
333    ///
334    /// # Example
335    ///
336    /// ```rust
337    /// use std::time::Duration;
338    ///
339    /// use qubit_lock::{ArcMonitor, WaitTimeoutResult};
340    ///
341    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
342    /// let result = monitor.wait_timeout_while(
343    ///     Duration::from_millis(1),
344    ///     |items| items.is_empty(),
345    ///     |items| items.pop(),
346    /// );
347    ///
348    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
349    /// ```
350    #[inline]
351    pub fn wait_timeout_while<R, P, F>(
352        &self,
353        timeout: Duration,
354        waiting: P,
355        f: F,
356    ) -> WaitTimeoutResult<R>
357    where
358        P: FnMut(&T) -> bool,
359        F: FnOnce(&mut T) -> R,
360    {
361        self.inner.wait_timeout_while(timeout, waiting, f)
362    }
363
364    /// Waits until a predicate becomes true, with an overall time limit.
365    ///
366    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
367    /// true before `timeout` expires, `f` runs while the monitor lock is still
368    /// held. If the timeout expires first, the closure is not called.
369    ///
370    /// # Arguments
371    ///
372    /// * `timeout` - Maximum total duration to wait.
373    /// * `ready` - Predicate that returns `true` when the caller may continue.
374    /// * `f` - Closure that receives mutable access to the ready state.
375    ///
376    /// # Returns
377    ///
378    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
379    /// predicate becomes true before the timeout. Returns
380    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
381    ///
382    /// # Example
383    ///
384    /// ```rust
385    /// use std::{
386    ///     thread,
387    ///     time::Duration,
388    /// };
389    ///
390    /// use qubit_lock::{ArcMonitor, WaitTimeoutResult};
391    ///
392    /// let monitor = ArcMonitor::new(false);
393    /// let worker_monitor = monitor.clone();
394    ///
395    /// let worker = thread::spawn(move || {
396    ///     worker_monitor.wait_timeout_until(
397    ///         Duration::from_secs(1),
398    ///         |ready| *ready,
399    ///         |ready| {
400    ///             *ready = false;
401    ///             5
402    ///         },
403    ///     )
404    /// });
405    ///
406    /// monitor.write(|ready| *ready = true);
407    /// monitor.notify_one();
408    ///
409    /// assert_eq!(
410    ///     worker.join().expect("worker should finish"),
411    ///     WaitTimeoutResult::Ready(5),
412    /// );
413    /// ```
414    #[inline]
415    pub fn wait_timeout_until<R, P, F>(
416        &self,
417        timeout: Duration,
418        ready: P,
419        f: F,
420    ) -> WaitTimeoutResult<R>
421    where
422        P: FnMut(&T) -> bool,
423        F: FnOnce(&mut T) -> R,
424    {
425        self.inner.wait_timeout_until(timeout, ready, f)
426    }
427
428    /// Wakes one thread waiting on this monitor's condition variable.
429    ///
430    /// Notifications do not carry state by themselves. A waiting thread only
431    /// proceeds safely after rechecking the protected state. Call this after
432    /// changing state that may make one waiter able to continue.
433    #[inline]
434    pub fn notify_one(&self) {
435        self.inner.notify_one();
436    }
437
438    /// Wakes all threads waiting on this monitor's condition variable.
439    ///
440    /// Notifications do not carry state by themselves. Every awakened thread
441    /// must recheck the protected state before continuing. Call this after a
442    /// state change that may allow multiple waiters to make progress.
443    #[inline]
444    pub fn notify_all(&self) {
445        self.inner.notify_all();
446    }
447}
448
449impl<T> AsRef<Monitor<T>> for ArcMonitor<T> {
450    /// Returns a reference to the underlying monitor.
451    ///
452    /// This is useful when callers need an explicit [`Monitor`] reference while
453    /// keeping the cloneable [`ArcMonitor`] handle.
454    #[inline]
455    fn as_ref(&self) -> &Monitor<T> {
456        self.inner.as_ref()
457    }
458}
459
460impl<T> Deref for ArcMonitor<T> {
461    type Target = Monitor<T>;
462
463    /// Dereferences this wrapper to the underlying monitor.
464    ///
465    /// Method-call dereferencing lets callers use native [`Monitor`] APIs
466    /// directly, while this wrapper still provides cloneable ownership.
467    #[inline]
468    fn deref(&self) -> &Self::Target {
469        self.inner.as_ref()
470    }
471}
472
473impl<T> From<T> for ArcMonitor<T> {
474    /// Creates an Arc-wrapped monitor from an initial state value.
475    ///
476    /// # Arguments
477    ///
478    /// * `value` - Initial state protected by the monitor.
479    ///
480    /// # Returns
481    ///
482    /// A cloneable monitor handle protecting `value`.
483    #[inline]
484    fn from(value: T) -> Self {
485        Self::new(value)
486    }
487}
488
489impl<T: Default> Default for ArcMonitor<T> {
490    /// Creates an Arc-wrapped monitor containing `T::default()`.
491    ///
492    /// # Returns
493    ///
494    /// A cloneable monitor handle protecting the default value for `T`.
495    #[inline]
496    fn default() -> Self {
497        Self::new(T::default())
498    }
499}
500
501impl<T> Clone for ArcMonitor<T> {
502    /// Clones this monitor handle.
503    ///
504    /// The cloned handle shares the same protected state and condition
505    /// variable with the original.
506    ///
507    /// # Returns
508    ///
509    /// A new handle sharing the same monitor state.
510    #[inline]
511    fn clone(&self) -> Self {
512        Self {
513            inner: self.inner.clone(),
514        }
515    }
516}