Skip to main content

qubit_lock/monitor/
arc_std_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc StdMonitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17    ops::Deref,
18    sync::Arc,
19    time::Duration,
20};
21
22use super::{
23    ConditionWaiter,
24    NotificationWaiter,
25    Notifier,
26    StdMonitor,
27    StdMonitorGuard,
28    TimeoutConditionWaiter,
29    TimeoutNotificationWaiter,
30    WaitTimeoutResult,
31    WaitTimeoutStatus,
32};
33
34/// Arc-wrapped monitor for shared condition-based state coordination.
35///
36/// `ArcStdMonitor` stores a [`StdMonitor`] behind an [`Arc`], so callers can clone
37/// the monitor handle directly without writing `Arc::new(StdMonitor::new(...))`.
38/// It preserves the same guard-based waiting, predicate-based waiting, and
39/// poison recovery semantics as [`StdMonitor`]. It implements [`Deref`] and
40/// [`AsRef`] so callers can pass it to APIs that expect a [`StdMonitor`]
41/// reference.
42///
43/// # Type Parameters
44///
45/// * `T` - The state protected by this monitor.
46///
47/// # Example
48///
49/// ```rust
50/// use std::thread;
51///
52/// use qubit_lock::ArcStdMonitor;
53///
54/// let monitor = ArcStdMonitor::new(false);
55/// let waiter_monitor = monitor.clone();
56///
57/// let waiter = thread::spawn(move || {
58///     waiter_monitor.wait_until(
59///         |ready| *ready,
60///         |ready| {
61///             *ready = false;
62///         },
63///     );
64/// });
65///
66/// monitor.write(|ready| {
67///     *ready = true;
68/// });
69/// monitor.notify_all();
70///
71/// waiter.join().expect("waiter should finish");
72/// assert!(!monitor.read(|ready| *ready));
73/// ```
74///
75pub struct ArcStdMonitor<T> {
76    /// Shared monitor instance.
77    inner: Arc<StdMonitor<T>>,
78}
79
80impl<T> ArcStdMonitor<T> {
81    /// Creates an Arc-wrapped monitor protecting the supplied state value.
82    ///
83    /// # Arguments
84    ///
85    /// * `state` - Initial state protected by the monitor.
86    ///
87    /// # Returns
88    ///
89    /// A cloneable monitor handle initialized with the supplied state.
90    #[inline]
91    pub fn new(state: T) -> Self {
92        Self {
93            inner: Arc::new(StdMonitor::new(state)),
94        }
95    }
96
97    /// Acquires the shared monitor and returns a guard.
98    ///
99    /// This delegates to [`StdMonitor::lock`]. The returned [`StdMonitorGuard`]
100    /// keeps the monitor mutex locked until it is dropped. It can also wait on
101    /// the monitor's condition variable through [`StdMonitorGuard::wait`] or
102    /// [`StdMonitorGuard::wait_timeout`].
103    ///
104    /// If the underlying mutex is poisoned, this method recovers the inner
105    /// state and still returns a guard.
106    ///
107    /// # Returns
108    ///
109    /// A guard that provides read and write access to the protected state.
110    ///
111    /// # Example
112    ///
113    /// ```rust
114    /// use qubit_lock::ArcStdMonitor;
115    ///
116    /// let monitor = ArcStdMonitor::new(1);
117    /// {
118    ///     let mut value = monitor.lock();
119    ///     *value += 1;
120    /// }
121    ///
122    /// assert_eq!(monitor.read(|value| *value), 2);
123    /// ```
124    #[inline]
125    pub fn lock(&self) -> StdMonitorGuard<'_, T> {
126        self.inner.lock()
127    }
128
129    /// Acquires the monitor and reads the protected state.
130    ///
131    /// This delegates to [`StdMonitor::read`]. The closure runs while the monitor
132    /// mutex is held, so keep it short and avoid long blocking work.
133    ///
134    /// # Arguments
135    ///
136    /// * `f` - Closure that receives an immutable reference to the state.
137    ///
138    /// # Returns
139    ///
140    /// The value returned by `f`.
141    #[inline]
142    pub fn read<R, F>(&self, f: F) -> R
143    where
144        F: FnOnce(&T) -> R,
145    {
146        self.inner.read(f)
147    }
148
149    /// Acquires the monitor and mutates the protected state.
150    ///
151    /// This delegates to [`StdMonitor::write`]. Callers should explicitly invoke
152    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
153    /// waiting thread may observe.
154    ///
155    /// # Arguments
156    ///
157    /// * `f` - Closure that receives a mutable reference to the state.
158    ///
159    /// # Returns
160    ///
161    /// The value returned by `f`.
162    #[inline]
163    pub fn write<R, F>(&self, f: F) -> R
164    where
165        F: FnOnce(&mut T) -> R,
166    {
167        self.inner.write(f)
168    }
169
170    /// Mutates the protected state and wakes one waiter.
171    ///
172    /// This delegates to [`StdMonitor::write_notify_one`]. The closure runs
173    /// while the monitor mutex is held; after it returns, the lock is released
174    /// and one waiter is notified. If `f` panics, the panic is propagated and no
175    /// notification is sent.
176    ///
177    /// # Arguments
178    ///
179    /// * `f` - Closure that receives a mutable reference to the state.
180    ///
181    /// # Returns
182    ///
183    /// The value returned by `f`.
184    #[inline]
185    pub fn write_notify_one<R, F>(&self, f: F) -> R
186    where
187        F: FnOnce(&mut T) -> R,
188    {
189        self.inner.write_notify_one(f)
190    }
191
192    /// Mutates the protected state and wakes all waiters.
193    ///
194    /// This delegates to [`StdMonitor::write_notify_all`]. The closure runs
195    /// while the monitor mutex is held; after it returns, the lock is released
196    /// and all waiters are notified. If `f` panics, the panic is propagated and
197    /// no notification is sent.
198    ///
199    /// # Arguments
200    ///
201    /// * `f` - Closure that receives a mutable reference to the state.
202    ///
203    /// # Returns
204    ///
205    /// The value returned by `f`.
206    #[inline]
207    pub fn write_notify_all<R, F>(&self, f: F) -> R
208    where
209        F: FnOnce(&mut T) -> R,
210    {
211        self.inner.write_notify_all(f)
212    }
213
214    /// Waits for a notification without checking state.
215    ///
216    /// This delegates to [`StdMonitor::wait`].
217    #[inline]
218    pub fn wait(&self) {
219        self.inner.wait();
220    }
221
222    /// Waits for a notification or timeout without checking state.
223    ///
224    /// This delegates to [`StdMonitor::wait_for`]. Most
225    /// coordination code should prefer [`Self::wait_while`],
226    /// [`Self::wait_until`], or an explicit [`StdMonitorGuard`] loop.
227    ///
228    /// Condition variables may wake spuriously, so
229    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
230    /// state.
231    ///
232    /// # Arguments
233    ///
234    /// * `timeout` - Maximum duration to wait for a notification.
235    ///
236    /// # Returns
237    ///
238    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
239    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
240    ///
241    /// # Example
242    ///
243    /// ```rust
244    /// use std::time::Duration;
245    ///
246    /// use qubit_lock::{ArcStdMonitor, WaitTimeoutStatus};
247    ///
248    /// let monitor = ArcStdMonitor::new(false);
249    /// let status = monitor.wait_for(Duration::from_millis(1));
250    ///
251    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
252    /// ```
253    #[inline]
254    pub fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
255        self.inner.wait_for(timeout)
256    }
257
258    /// Waits while a predicate remains true, then mutates the protected state.
259    ///
260    /// This delegates to [`StdMonitor::wait_while`]. The predicate is evaluated
261    /// while holding the monitor mutex, and the closure runs while the mutex is
262    /// still held after the predicate stops blocking.
263    ///
264    /// This method may block indefinitely if no thread changes the state so
265    /// that `waiting` becomes false and sends a notification.
266    ///
267    /// # Arguments
268    ///
269    /// * `waiting` - Predicate that returns `true` while the caller should
270    ///   keep waiting.
271    /// * `f` - Closure that receives mutable access after waiting is no longer
272    ///   required.
273    ///
274    /// # Returns
275    ///
276    /// The value returned by `f`.
277    ///
278    /// # Example
279    ///
280    /// ```rust
281    /// use std::thread;
282    ///
283    /// use qubit_lock::ArcStdMonitor;
284    ///
285    /// let monitor = ArcStdMonitor::new(Vec::<i32>::new());
286    /// let worker_monitor = monitor.clone();
287    ///
288    /// let worker = thread::spawn(move || {
289    ///     worker_monitor.wait_while(
290    ///         |items| items.is_empty(),
291    ///         |items| items.pop().expect("item should be ready"),
292    ///     )
293    /// });
294    ///
295    /// monitor.write(|items| items.push(7));
296    /// monitor.notify_one();
297    ///
298    /// assert_eq!(worker.join().expect("worker should finish"), 7);
299    /// ```
300    #[inline]
301    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
302    where
303        P: FnMut(&T) -> bool,
304        F: FnOnce(&mut T) -> R,
305    {
306        self.inner.wait_while(waiting, f)
307    }
308
309    /// Waits until the protected state satisfies a predicate, then mutates it.
310    ///
311    /// This delegates to [`StdMonitor::wait_until`]. It may block indefinitely if
312    /// no thread changes the state to satisfy the predicate and sends a
313    /// notification.
314    ///
315    /// # Arguments
316    ///
317    /// * `ready` - Predicate that returns `true` when the state is ready.
318    /// * `f` - Closure that receives mutable access to the ready state.
319    ///
320    /// # Returns
321    ///
322    /// The value returned by `f`.
323    #[inline]
324    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
325    where
326        P: FnMut(&T) -> bool,
327        F: FnOnce(&mut T) -> R,
328    {
329        self.inner.wait_until(ready, f)
330    }
331
332    /// Waits while a predicate remains true, with an overall time limit.
333    ///
334    /// This delegates to [`StdMonitor::wait_while_for`]. If `waiting` becomes
335    /// false before `timeout` expires, `f` runs while the monitor lock is still
336    /// held. If the timeout expires first, the closure is not called.
337    ///
338    /// # Arguments
339    ///
340    /// * `timeout` - Maximum total duration to wait.
341    /// * `waiting` - Predicate that returns `true` while the caller should
342    ///   continue waiting.
343    /// * `f` - Closure that receives mutable access when waiting is no longer
344    ///   required.
345    ///
346    /// # Returns
347    ///
348    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
349    /// predicate stops blocking before the timeout. Returns
350    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
351    ///
352    /// # Example
353    ///
354    /// ```rust
355    /// use std::time::Duration;
356    ///
357    /// use qubit_lock::{ArcStdMonitor, WaitTimeoutResult};
358    ///
359    /// let monitor = ArcStdMonitor::new(Vec::<i32>::new());
360    /// let result = monitor.wait_while_for(
361    ///     Duration::from_millis(1),
362    ///     |items| items.is_empty(),
363    ///     |items| items.pop(),
364    /// );
365    ///
366    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
367    /// ```
368    #[inline]
369    pub fn wait_while_for<R, P, F>(
370        &self,
371        timeout: Duration,
372        waiting: P,
373        f: F,
374    ) -> WaitTimeoutResult<R>
375    where
376        P: FnMut(&T) -> bool,
377        F: FnOnce(&mut T) -> R,
378    {
379        self.inner.wait_while_for(timeout, waiting, f)
380    }
381
382    /// Waits until a predicate becomes true, with an overall time limit.
383    ///
384    /// This delegates to [`StdMonitor::wait_until_for`]. If `ready` becomes
385    /// true before `timeout` expires, `f` runs while the monitor lock is still
386    /// held. If the timeout expires first, the closure is not called.
387    ///
388    /// # Arguments
389    ///
390    /// * `timeout` - Maximum total duration to wait.
391    /// * `ready` - Predicate that returns `true` when the caller may continue.
392    /// * `f` - Closure that receives mutable access to the ready state.
393    ///
394    /// # Returns
395    ///
396    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
397    /// predicate becomes true before the timeout. Returns
398    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
399    ///
400    /// # Example
401    ///
402    /// ```rust
403    /// use std::{
404    ///     thread,
405    ///     time::Duration,
406    /// };
407    ///
408    /// use qubit_lock::{ArcStdMonitor, WaitTimeoutResult};
409    ///
410    /// let monitor = ArcStdMonitor::new(false);
411    /// let worker_monitor = monitor.clone();
412    ///
413    /// let worker = thread::spawn(move || {
414    ///     worker_monitor.wait_until_for(
415    ///         Duration::from_secs(1),
416    ///         |ready| *ready,
417    ///         |ready| {
418    ///             *ready = false;
419    ///             5
420    ///         },
421    ///     )
422    /// });
423    ///
424    /// monitor.write(|ready| *ready = true);
425    /// monitor.notify_one();
426    ///
427    /// assert_eq!(
428    ///     worker.join().expect("worker should finish"),
429    ///     WaitTimeoutResult::Ready(5),
430    /// );
431    /// ```
432    #[inline]
433    pub fn wait_until_for<R, P, F>(&self, timeout: Duration, ready: P, f: F) -> WaitTimeoutResult<R>
434    where
435        P: FnMut(&T) -> bool,
436        F: FnOnce(&mut T) -> R,
437    {
438        self.inner.wait_until_for(timeout, ready, f)
439    }
440
441    /// Wakes one thread waiting on this monitor's condition variable.
442    ///
443    /// Notifications do not carry state by themselves. A waiting thread only
444    /// proceeds safely after rechecking the protected state. Call this after
445    /// changing state that may make one waiter able to continue.
446    #[inline]
447    pub fn notify_one(&self) {
448        self.inner.notify_one();
449    }
450
451    /// Wakes all threads waiting on this monitor's condition variable.
452    ///
453    /// Notifications do not carry state by themselves. Every awakened thread
454    /// must recheck the protected state before continuing. Call this after a
455    /// state change that may allow multiple waiters to make progress.
456    #[inline]
457    pub fn notify_all(&self) {
458        self.inner.notify_all();
459    }
460}
461
462impl<T> AsRef<StdMonitor<T>> for ArcStdMonitor<T> {
463    /// Returns a reference to the underlying standard monitor.
464    ///
465    /// This is useful when callers need an explicit [`StdMonitor`] reference
466    /// while keeping the cloneable [`ArcStdMonitor`] handle.
467    #[inline]
468    fn as_ref(&self) -> &StdMonitor<T> {
469        self.inner.as_ref()
470    }
471}
472
473impl<T> Notifier for ArcStdMonitor<T> {
474    /// Wakes one thread waiting on this monitor.
475    #[inline]
476    fn notify_one(&self) {
477        Self::notify_one(self);
478    }
479
480    /// Wakes all threads waiting on this monitor.
481    #[inline]
482    fn notify_all(&self) {
483        Self::notify_all(self);
484    }
485}
486
487impl<T> NotificationWaiter for ArcStdMonitor<T> {
488    /// Blocks until a notification wakes this waiter.
489    #[inline]
490    fn wait(&self) {
491        Self::wait(self);
492    }
493}
494
495impl<T> TimeoutNotificationWaiter for ArcStdMonitor<T> {
496    /// Blocks until a notification wakes this waiter or the timeout expires.
497    #[inline]
498    fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
499        Self::wait_for(self, timeout)
500    }
501}
502
503impl<T> ConditionWaiter for ArcStdMonitor<T> {
504    type State = T;
505
506    /// Blocks until the predicate becomes true, then runs the action.
507    #[inline]
508    fn wait_until<R, P, F>(&self, predicate: P, action: F) -> R
509    where
510        P: FnMut(&Self::State) -> bool,
511        F: FnOnce(&mut Self::State) -> R,
512    {
513        Self::wait_until(self, predicate, action)
514    }
515
516    /// Blocks while the predicate remains true, then runs the action.
517    #[inline]
518    fn wait_while<R, P, F>(&self, predicate: P, action: F) -> R
519    where
520        P: FnMut(&Self::State) -> bool,
521        F: FnOnce(&mut Self::State) -> R,
522    {
523        Self::wait_while(self, predicate, action)
524    }
525}
526
527impl<T> TimeoutConditionWaiter for ArcStdMonitor<T> {
528    /// Blocks until the predicate becomes true or the timeout expires.
529    #[inline]
530    fn wait_until_for<R, P, F>(
531        &self,
532        timeout: Duration,
533        predicate: P,
534        action: F,
535    ) -> WaitTimeoutResult<R>
536    where
537        P: FnMut(&Self::State) -> bool,
538        F: FnOnce(&mut Self::State) -> R,
539    {
540        Self::wait_until_for(self, timeout, predicate, action)
541    }
542
543    /// Blocks while the predicate remains true or until the timeout expires.
544    #[inline]
545    fn wait_while_for<R, P, F>(
546        &self,
547        timeout: Duration,
548        predicate: P,
549        action: F,
550    ) -> WaitTimeoutResult<R>
551    where
552        P: FnMut(&Self::State) -> bool,
553        F: FnOnce(&mut Self::State) -> R,
554    {
555        Self::wait_while_for(self, timeout, predicate, action)
556    }
557}
558
559impl<T> Deref for ArcStdMonitor<T> {
560    type Target = StdMonitor<T>;
561
562    /// Dereferences this wrapper to the underlying standard monitor.
563    ///
564    /// Method-call dereferencing lets callers use native [`StdMonitor`] APIs
565    /// directly, while this wrapper still provides cloneable ownership.
566    #[inline]
567    fn deref(&self) -> &Self::Target {
568        self.inner.as_ref()
569    }
570}
571
572impl<T> From<T> for ArcStdMonitor<T> {
573    /// Creates an Arc-wrapped standard monitor from an initial state value.
574    ///
575    /// # Arguments
576    ///
577    /// * `value` - Initial state protected by the monitor.
578    ///
579    /// # Returns
580    ///
581    /// A cloneable standard monitor handle protecting `value`.
582    #[inline]
583    fn from(value: T) -> Self {
584        Self::new(value)
585    }
586}
587
588impl<T: Default> Default for ArcStdMonitor<T> {
589    /// Creates an Arc-wrapped monitor containing `T::default()`.
590    ///
591    /// # Returns
592    ///
593    /// A cloneable monitor handle protecting the default value for `T`.
594    #[inline]
595    fn default() -> Self {
596        Self::new(T::default())
597    }
598}
599
600impl<T> Clone for ArcStdMonitor<T> {
601    /// Clones this monitor handle.
602    ///
603    /// The cloned handle shares the same protected state and condition
604    /// variable with the original.
605    ///
606    /// # Returns
607    ///
608    /// A new handle sharing the same monitor state.
609    #[inline]
610    fn clone(&self) -> Self {
611        Self {
612            inner: self.inner.clone(),
613        }
614    }
615}