qubit_lock/monitor/arc_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026.
4 * Haixing Hu, Qubit Co. Ltd.
5 *
6 * All rights reserved.
7 *
8 ******************************************************************************/
9//! # Arc Monitor
10//!
11//! Provides an Arc-wrapped synchronous monitor for condition-based state
12//! coordination across threads.
13//!
14//! # Author
15//!
16//! Haixing Hu
17
18use std::sync::Arc;
19use std::time::Duration;
20
21use super::{
22 Monitor,
23 MonitorGuard,
24 WaitTimeoutResult,
25 WaitTimeoutStatus,
26};
27
28/// Arc-wrapped monitor for shared condition-based state coordination.
29///
30/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
31/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
32/// It preserves the same guard-based waiting, predicate-based waiting, and
33/// poison recovery semantics as [`Monitor`].
34///
35/// # Type Parameters
36///
37/// * `T` - The state protected by this monitor.
38///
39/// # Example
40///
41/// ```rust
42/// use std::thread;
43///
44/// use qubit_lock::lock::ArcMonitor;
45///
46/// let monitor = ArcMonitor::new(false);
47/// let waiter_monitor = monitor.clone();
48///
49/// let waiter = thread::spawn(move || {
50/// waiter_monitor.wait_until(
51/// |ready| *ready,
52/// |ready| {
53/// *ready = false;
54/// },
55/// );
56/// });
57///
58/// monitor.write(|ready| {
59/// *ready = true;
60/// });
61/// monitor.notify_all();
62///
63/// waiter.join().expect("waiter should finish");
64/// assert!(!monitor.read(|ready| *ready));
65/// ```
66///
67/// # Author
68///
69/// Haixing Hu
70pub struct ArcMonitor<T> {
71 /// Shared monitor instance.
72 inner: Arc<Monitor<T>>,
73}
74
75impl<T> ArcMonitor<T> {
76 /// Creates an Arc-wrapped monitor protecting the supplied state value.
77 ///
78 /// # Arguments
79 ///
80 /// * `state` - Initial state protected by the monitor.
81 ///
82 /// # Returns
83 ///
84 /// A cloneable monitor handle initialized with the supplied state.
85 #[inline]
86 pub fn new(state: T) -> Self {
87 Self {
88 inner: Arc::new(Monitor::new(state)),
89 }
90 }
91
92 /// Acquires the shared monitor and returns a guard.
93 ///
94 /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
95 /// keeps the monitor mutex locked until it is dropped. It can also wait on
96 /// the monitor's condition variable through [`MonitorGuard::wait`] or
97 /// [`MonitorGuard::wait_timeout`].
98 ///
99 /// If the underlying mutex is poisoned, this method recovers the inner
100 /// state and still returns a guard.
101 ///
102 /// # Returns
103 ///
104 /// A guard that provides read and write access to the protected state.
105 ///
106 /// # Example
107 ///
108 /// ```rust
109 /// use qubit_lock::lock::ArcMonitor;
110 ///
111 /// let monitor = ArcMonitor::new(1);
112 /// {
113 /// let mut value = monitor.lock();
114 /// *value += 1;
115 /// }
116 ///
117 /// assert_eq!(monitor.read(|value| *value), 2);
118 /// ```
119 #[inline]
120 pub fn lock(&self) -> MonitorGuard<'_, T> {
121 self.inner.lock()
122 }
123
124 /// Acquires the monitor and reads the protected state.
125 ///
126 /// This delegates to [`Monitor::read`]. The closure runs while the monitor
127 /// mutex is held, so keep it short and avoid long blocking work.
128 ///
129 /// # Arguments
130 ///
131 /// * `f` - Closure that receives an immutable reference to the state.
132 ///
133 /// # Returns
134 ///
135 /// The value returned by `f`.
136 #[inline]
137 pub fn read<R, F>(&self, f: F) -> R
138 where
139 F: FnOnce(&T) -> R,
140 {
141 self.inner.read(f)
142 }
143
144 /// Acquires the monitor and mutates the protected state.
145 ///
146 /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
147 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
148 /// waiting thread may observe.
149 ///
150 /// # Arguments
151 ///
152 /// * `f` - Closure that receives a mutable reference to the state.
153 ///
154 /// # Returns
155 ///
156 /// The value returned by `f`.
157 #[inline]
158 pub fn write<R, F>(&self, f: F) -> R
159 where
160 F: FnOnce(&mut T) -> R,
161 {
162 self.inner.write(f)
163 }
164
165 /// Waits for a notification or timeout without checking state.
166 ///
167 /// This delegates to [`Monitor::wait_notify`]. Most
168 /// coordination code should prefer [`Self::wait_while`],
169 /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
170 ///
171 /// Condition variables may wake spuriously, so
172 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
173 /// state.
174 ///
175 /// # Arguments
176 ///
177 /// * `timeout` - Maximum duration to wait for a notification.
178 ///
179 /// # Returns
180 ///
181 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
182 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
183 ///
184 /// # Example
185 ///
186 /// ```rust
187 /// use std::time::Duration;
188 ///
189 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
190 ///
191 /// let monitor = ArcMonitor::new(false);
192 /// let status = monitor.wait_notify(Duration::from_millis(1));
193 ///
194 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
195 /// ```
196 #[inline]
197 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
198 self.inner.wait_notify(timeout)
199 }
200
201 /// Waits while a predicate remains true, then mutates the protected state.
202 ///
203 /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
204 /// while holding the monitor mutex, and the closure runs while the mutex is
205 /// still held after the predicate stops blocking.
206 ///
207 /// This method may block indefinitely if no thread changes the state so
208 /// that `waiting` becomes false and sends a notification.
209 ///
210 /// # Arguments
211 ///
212 /// * `waiting` - Predicate that returns `true` while the caller should
213 /// keep waiting.
214 /// * `f` - Closure that receives mutable access after waiting is no longer
215 /// required.
216 ///
217 /// # Returns
218 ///
219 /// The value returned by `f`.
220 ///
221 /// # Example
222 ///
223 /// ```rust
224 /// use std::thread;
225 ///
226 /// use qubit_lock::lock::ArcMonitor;
227 ///
228 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
229 /// let worker_monitor = monitor.clone();
230 ///
231 /// let worker = thread::spawn(move || {
232 /// worker_monitor.wait_while(
233 /// |items| items.is_empty(),
234 /// |items| items.pop().expect("item should be ready"),
235 /// )
236 /// });
237 ///
238 /// monitor.write(|items| items.push(7));
239 /// monitor.notify_one();
240 ///
241 /// assert_eq!(worker.join().expect("worker should finish"), 7);
242 /// ```
243 #[inline]
244 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
245 where
246 P: FnMut(&T) -> bool,
247 F: FnOnce(&mut T) -> R,
248 {
249 self.inner.wait_while(waiting, f)
250 }
251
252 /// Waits until the protected state satisfies a predicate, then mutates it.
253 ///
254 /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
255 /// no thread changes the state to satisfy the predicate and sends a
256 /// notification.
257 ///
258 /// # Arguments
259 ///
260 /// * `ready` - Predicate that returns `true` when the state is ready.
261 /// * `f` - Closure that receives mutable access to the ready state.
262 ///
263 /// # Returns
264 ///
265 /// The value returned by `f`.
266 #[inline]
267 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
268 where
269 P: FnMut(&T) -> bool,
270 F: FnOnce(&mut T) -> R,
271 {
272 self.inner.wait_until(ready, f)
273 }
274
275 /// Waits while a predicate remains true, with an overall time limit.
276 ///
277 /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
278 /// false before `timeout` expires, `f` runs while the monitor lock is still
279 /// held. If the timeout expires first, the closure is not called.
280 ///
281 /// # Arguments
282 ///
283 /// * `timeout` - Maximum total duration to wait.
284 /// * `waiting` - Predicate that returns `true` while the caller should
285 /// continue waiting.
286 /// * `f` - Closure that receives mutable access when waiting is no longer
287 /// required.
288 ///
289 /// # Returns
290 ///
291 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
292 /// predicate stops blocking before the timeout. Returns
293 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
294 ///
295 /// # Example
296 ///
297 /// ```rust
298 /// use std::time::Duration;
299 ///
300 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
301 ///
302 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
303 /// let result = monitor.wait_timeout_while(
304 /// Duration::from_millis(1),
305 /// |items| items.is_empty(),
306 /// |items| items.pop(),
307 /// );
308 ///
309 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
310 /// ```
311 #[inline]
312 pub fn wait_timeout_while<R, P, F>(
313 &self,
314 timeout: Duration,
315 waiting: P,
316 f: F,
317 ) -> WaitTimeoutResult<R>
318 where
319 P: FnMut(&T) -> bool,
320 F: FnOnce(&mut T) -> R,
321 {
322 self.inner.wait_timeout_while(timeout, waiting, f)
323 }
324
325 /// Waits until a predicate becomes true, with an overall time limit.
326 ///
327 /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
328 /// true before `timeout` expires, `f` runs while the monitor lock is still
329 /// held. If the timeout expires first, the closure is not called.
330 ///
331 /// # Arguments
332 ///
333 /// * `timeout` - Maximum total duration to wait.
334 /// * `ready` - Predicate that returns `true` when the caller may continue.
335 /// * `f` - Closure that receives mutable access to the ready state.
336 ///
337 /// # Returns
338 ///
339 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
340 /// predicate becomes true before the timeout. Returns
341 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
342 ///
343 /// # Example
344 ///
345 /// ```rust
346 /// use std::{
347 /// thread,
348 /// time::Duration,
349 /// };
350 ///
351 /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
352 ///
353 /// let monitor = ArcMonitor::new(false);
354 /// let worker_monitor = monitor.clone();
355 ///
356 /// let worker = thread::spawn(move || {
357 /// worker_monitor.wait_timeout_until(
358 /// Duration::from_secs(1),
359 /// |ready| *ready,
360 /// |ready| {
361 /// *ready = false;
362 /// 5
363 /// },
364 /// )
365 /// });
366 ///
367 /// monitor.write(|ready| *ready = true);
368 /// monitor.notify_one();
369 ///
370 /// assert_eq!(
371 /// worker.join().expect("worker should finish"),
372 /// WaitTimeoutResult::Ready(5),
373 /// );
374 /// ```
375 #[inline]
376 pub fn wait_timeout_until<R, P, F>(
377 &self,
378 timeout: Duration,
379 ready: P,
380 f: F,
381 ) -> WaitTimeoutResult<R>
382 where
383 P: FnMut(&T) -> bool,
384 F: FnOnce(&mut T) -> R,
385 {
386 self.inner.wait_timeout_until(timeout, ready, f)
387 }
388
389 /// Wakes one thread waiting on this monitor's condition variable.
390 ///
391 /// Notifications do not carry state by themselves. A waiting thread only
392 /// proceeds safely after rechecking the protected state. Call this after
393 /// changing state that may make one waiter able to continue.
394 #[inline]
395 pub fn notify_one(&self) {
396 self.inner.notify_one();
397 }
398
399 /// Wakes all threads waiting on this monitor's condition variable.
400 ///
401 /// Notifications do not carry state by themselves. Every awakened thread
402 /// must recheck the protected state before continuing. Call this after a
403 /// state change that may allow multiple waiters to make progress.
404 #[inline]
405 pub fn notify_all(&self) {
406 self.inner.notify_all();
407 }
408}
409
410impl<T: Default> Default for ArcMonitor<T> {
411 /// Creates an Arc-wrapped monitor containing `T::default()`.
412 ///
413 /// # Returns
414 ///
415 /// A cloneable monitor handle protecting the default value for `T`.
416 #[inline]
417 fn default() -> Self {
418 Self::new(T::default())
419 }
420}
421
422impl<T> Clone for ArcMonitor<T> {
423 /// Clones this monitor handle.
424 ///
425 /// The cloned handle shares the same protected state and condition
426 /// variable with the original.
427 ///
428 /// # Returns
429 ///
430 /// A new handle sharing the same monitor state.
431 #[inline]
432 fn clone(&self) -> Self {
433 Self {
434 inner: self.inner.clone(),
435 }
436 }
437}