qubit_lock/monitor/arc_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::sync::Arc;
17use std::time::Duration;
18
19use super::{
20 Monitor,
21 MonitorGuard,
22 WaitTimeoutResult,
23 WaitTimeoutStatus,
24};
25
26/// Arc-wrapped monitor for shared condition-based state coordination.
27///
28/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
29/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
30/// It preserves the same guard-based waiting, predicate-based waiting, and
31/// poison recovery semantics as [`Monitor`].
32///
33/// # Type Parameters
34///
35/// * `T` - The state protected by this monitor.
36///
37/// # Example
38///
39/// ```rust
40/// use std::thread;
41///
42/// use qubit_lock::lock::ArcMonitor;
43///
44/// let monitor = ArcMonitor::new(false);
45/// let waiter_monitor = monitor.clone();
46///
47/// let waiter = thread::spawn(move || {
48/// waiter_monitor.wait_until(
49/// |ready| *ready,
50/// |ready| {
51/// *ready = false;
52/// },
53/// );
54/// });
55///
56/// monitor.write(|ready| {
57/// *ready = true;
58/// });
59/// monitor.notify_all();
60///
61/// waiter.join().expect("waiter should finish");
62/// assert!(!monitor.read(|ready| *ready));
63/// ```
64///
65pub struct ArcMonitor<T> {
66 /// Shared monitor instance.
67 inner: Arc<Monitor<T>>,
68}
69
70impl<T> ArcMonitor<T> {
71 /// Creates an Arc-wrapped monitor protecting the supplied state value.
72 ///
73 /// # Arguments
74 ///
75 /// * `state` - Initial state protected by the monitor.
76 ///
77 /// # Returns
78 ///
79 /// A cloneable monitor handle initialized with the supplied state.
80 #[inline]
81 pub fn new(state: T) -> Self {
82 Self {
83 inner: Arc::new(Monitor::new(state)),
84 }
85 }
86
87 /// Acquires the shared monitor and returns a guard.
88 ///
89 /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
90 /// keeps the monitor mutex locked until it is dropped. It can also wait on
91 /// the monitor's condition variable through [`MonitorGuard::wait`] or
92 /// [`MonitorGuard::wait_timeout`].
93 ///
94 /// If the underlying mutex is poisoned, this method recovers the inner
95 /// state and still returns a guard.
96 ///
97 /// # Returns
98 ///
99 /// A guard that provides read and write access to the protected state.
100 ///
101 /// # Example
102 ///
103 /// ```rust
104 /// use qubit_lock::lock::ArcMonitor;
105 ///
106 /// let monitor = ArcMonitor::new(1);
107 /// {
108 /// let mut value = monitor.lock();
109 /// *value += 1;
110 /// }
111 ///
112 /// assert_eq!(monitor.read(|value| *value), 2);
113 /// ```
114 #[inline]
115 pub fn lock(&self) -> MonitorGuard<'_, T> {
116 self.inner.lock()
117 }
118
119 /// Acquires the monitor and reads the protected state.
120 ///
121 /// This delegates to [`Monitor::read`]. The closure runs while the monitor
122 /// mutex is held, so keep it short and avoid long blocking work.
123 ///
124 /// # Arguments
125 ///
126 /// * `f` - Closure that receives an immutable reference to the state.
127 ///
128 /// # Returns
129 ///
130 /// The value returned by `f`.
131 #[inline]
132 pub fn read<R, F>(&self, f: F) -> R
133 where
134 F: FnOnce(&T) -> R,
135 {
136 self.inner.read(f)
137 }
138
139 /// Acquires the monitor and mutates the protected state.
140 ///
141 /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
142 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
143 /// waiting thread may observe.
144 ///
145 /// # Arguments
146 ///
147 /// * `f` - Closure that receives a mutable reference to the state.
148 ///
149 /// # Returns
150 ///
151 /// The value returned by `f`.
152 #[inline]
153 pub fn write<R, F>(&self, f: F) -> R
154 where
155 F: FnOnce(&mut T) -> R,
156 {
157 self.inner.write(f)
158 }
159
160 /// Waits for a notification or timeout without checking state.
161 ///
162 /// This delegates to [`Monitor::wait_notify`]. Most
163 /// coordination code should prefer [`Self::wait_while`],
164 /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
165 ///
166 /// Condition variables may wake spuriously, so
167 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
168 /// state.
169 ///
170 /// # Arguments
171 ///
172 /// * `timeout` - Maximum duration to wait for a notification.
173 ///
174 /// # Returns
175 ///
176 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
177 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
178 ///
179 /// # Example
180 ///
181 /// ```rust
182 /// use std::time::Duration;
183 ///
184 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
185 ///
186 /// let monitor = ArcMonitor::new(false);
187 /// let status = monitor.wait_notify(Duration::from_millis(1));
188 ///
189 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
190 /// ```
191 #[inline]
192 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
193 self.inner.wait_notify(timeout)
194 }
195
196 /// Waits while a predicate remains true, then mutates the protected state.
197 ///
198 /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
199 /// while holding the monitor mutex, and the closure runs while the mutex is
200 /// still held after the predicate stops blocking.
201 ///
202 /// This method may block indefinitely if no thread changes the state so
203 /// that `waiting` becomes false and sends a notification.
204 ///
205 /// # Arguments
206 ///
207 /// * `waiting` - Predicate that returns `true` while the caller should
208 /// keep waiting.
209 /// * `f` - Closure that receives mutable access after waiting is no longer
210 /// required.
211 ///
212 /// # Returns
213 ///
214 /// The value returned by `f`.
215 ///
216 /// # Example
217 ///
218 /// ```rust
219 /// use std::thread;
220 ///
221 /// use qubit_lock::lock::ArcMonitor;
222 ///
223 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
224 /// let worker_monitor = monitor.clone();
225 ///
226 /// let worker = thread::spawn(move || {
227 /// worker_monitor.wait_while(
228 /// |items| items.is_empty(),
229 /// |items| items.pop().expect("item should be ready"),
230 /// )
231 /// });
232 ///
233 /// monitor.write(|items| items.push(7));
234 /// monitor.notify_one();
235 ///
236 /// assert_eq!(worker.join().expect("worker should finish"), 7);
237 /// ```
238 #[inline]
239 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
240 where
241 P: FnMut(&T) -> bool,
242 F: FnOnce(&mut T) -> R,
243 {
244 self.inner.wait_while(waiting, f)
245 }
246
247 /// Waits until the protected state satisfies a predicate, then mutates it.
248 ///
249 /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
250 /// no thread changes the state to satisfy the predicate and sends a
251 /// notification.
252 ///
253 /// # Arguments
254 ///
255 /// * `ready` - Predicate that returns `true` when the state is ready.
256 /// * `f` - Closure that receives mutable access to the ready state.
257 ///
258 /// # Returns
259 ///
260 /// The value returned by `f`.
261 #[inline]
262 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
263 where
264 P: FnMut(&T) -> bool,
265 F: FnOnce(&mut T) -> R,
266 {
267 self.inner.wait_until(ready, f)
268 }
269
270 /// Waits while a predicate remains true, with an overall time limit.
271 ///
272 /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
273 /// false before `timeout` expires, `f` runs while the monitor lock is still
274 /// held. If the timeout expires first, the closure is not called.
275 ///
276 /// # Arguments
277 ///
278 /// * `timeout` - Maximum total duration to wait.
279 /// * `waiting` - Predicate that returns `true` while the caller should
280 /// continue waiting.
281 /// * `f` - Closure that receives mutable access when waiting is no longer
282 /// required.
283 ///
284 /// # Returns
285 ///
286 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
287 /// predicate stops blocking before the timeout. Returns
288 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
289 ///
290 /// # Example
291 ///
292 /// ```rust
293 /// use std::time::Duration;
294 ///
295 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
296 ///
297 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
298 /// let result = monitor.wait_timeout_while(
299 /// Duration::from_millis(1),
300 /// |items| items.is_empty(),
301 /// |items| items.pop(),
302 /// );
303 ///
304 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
305 /// ```
306 #[inline]
307 pub fn wait_timeout_while<R, P, F>(
308 &self,
309 timeout: Duration,
310 waiting: P,
311 f: F,
312 ) -> WaitTimeoutResult<R>
313 where
314 P: FnMut(&T) -> bool,
315 F: FnOnce(&mut T) -> R,
316 {
317 self.inner.wait_timeout_while(timeout, waiting, f)
318 }
319
320 /// Waits until a predicate becomes true, with an overall time limit.
321 ///
322 /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
323 /// true before `timeout` expires, `f` runs while the monitor lock is still
324 /// held. If the timeout expires first, the closure is not called.
325 ///
326 /// # Arguments
327 ///
328 /// * `timeout` - Maximum total duration to wait.
329 /// * `ready` - Predicate that returns `true` when the caller may continue.
330 /// * `f` - Closure that receives mutable access to the ready state.
331 ///
332 /// # Returns
333 ///
334 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
335 /// predicate becomes true before the timeout. Returns
336 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
337 ///
338 /// # Example
339 ///
340 /// ```rust
341 /// use std::{
342 /// thread,
343 /// time::Duration,
344 /// };
345 ///
346 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
347 ///
348 /// let monitor = ArcMonitor::new(false);
349 /// let worker_monitor = monitor.clone();
350 ///
351 /// let worker = thread::spawn(move || {
352 /// worker_monitor.wait_timeout_until(
353 /// Duration::from_secs(1),
354 /// |ready| *ready,
355 /// |ready| {
356 /// *ready = false;
357 /// 5
358 /// },
359 /// )
360 /// });
361 ///
362 /// monitor.write(|ready| *ready = true);
363 /// monitor.notify_one();
364 ///
365 /// assert_eq!(
366 /// worker.join().expect("worker should finish"),
367 /// WaitTimeoutResult::Ready(5),
368 /// );
369 /// ```
370 #[inline]
371 pub fn wait_timeout_until<R, P, F>(
372 &self,
373 timeout: Duration,
374 ready: P,
375 f: F,
376 ) -> WaitTimeoutResult<R>
377 where
378 P: FnMut(&T) -> bool,
379 F: FnOnce(&mut T) -> R,
380 {
381 self.inner.wait_timeout_until(timeout, ready, f)
382 }
383
384 /// Wakes one thread waiting on this monitor's condition variable.
385 ///
386 /// Notifications do not carry state by themselves. A waiting thread only
387 /// proceeds safely after rechecking the protected state. Call this after
388 /// changing state that may make one waiter able to continue.
389 #[inline]
390 pub fn notify_one(&self) {
391 self.inner.notify_one();
392 }
393
394 /// Wakes all threads waiting on this monitor's condition variable.
395 ///
396 /// Notifications do not carry state by themselves. Every awakened thread
397 /// must recheck the protected state before continuing. Call this after a
398 /// state change that may allow multiple waiters to make progress.
399 #[inline]
400 pub fn notify_all(&self) {
401 self.inner.notify_all();
402 }
403}
404
405impl<T: Default> Default for ArcMonitor<T> {
406 /// Creates an Arc-wrapped monitor containing `T::default()`.
407 ///
408 /// # Returns
409 ///
410 /// A cloneable monitor handle protecting the default value for `T`.
411 #[inline]
412 fn default() -> Self {
413 Self::new(T::default())
414 }
415}
416
417impl<T> Clone for ArcMonitor<T> {
418 /// Clones this monitor handle.
419 ///
420 /// The cloned handle shares the same protected state and condition
421 /// variable with the original.
422 ///
423 /// # Returns
424 ///
425 /// A new handle sharing the same monitor state.
426 #[inline]
427 fn clone(&self) -> Self {
428 Self {
429 inner: self.inner.clone(),
430 }
431 }
432}