qubit_lock/monitor/arc_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc Monitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17 ops::Deref,
18 sync::Arc,
19 time::Duration,
20};
21
22use super::{
23 Monitor,
24 MonitorGuard,
25 WaitTimeoutResult,
26 WaitTimeoutStatus,
27};
28
29/// Arc-wrapped monitor for shared condition-based state coordination.
30///
31/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
32/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
33/// It preserves the same guard-based waiting and predicate-based waiting
34/// semantics as [`Monitor`]. It implements [`Deref`] and [`AsRef`] so callers
35/// can pass it to APIs that expect a [`Monitor`] reference.
36///
37/// # Type Parameters
38///
39/// * `T` - The state protected by this monitor.
40///
41/// # Example
42///
43/// ```rust
44/// use std::thread;
45///
46/// use qubit_lock::ArcMonitor;
47///
48/// let monitor = ArcMonitor::new(false);
49/// let waiter_monitor = monitor.clone();
50///
51/// let waiter = thread::spawn(move || {
52/// waiter_monitor.wait_until(
53/// |ready| *ready,
54/// |ready| {
55/// *ready = false;
56/// },
57/// );
58/// });
59///
60/// monitor.write(|ready| {
61/// *ready = true;
62/// });
63/// monitor.notify_all();
64///
65/// waiter.join().expect("waiter should finish");
66/// assert!(!monitor.read(|ready| *ready));
67/// ```
68///
69pub struct ArcMonitor<T> {
70 /// Shared monitor instance.
71 inner: Arc<Monitor<T>>,
72}
73
74impl<T> ArcMonitor<T> {
75 /// Creates an Arc-wrapped monitor protecting the supplied state value.
76 ///
77 /// # Arguments
78 ///
79 /// * `state` - Initial state protected by the monitor.
80 ///
81 /// # Returns
82 ///
83 /// A cloneable monitor handle initialized with the supplied state.
84 #[inline]
85 pub fn new(state: T) -> Self {
86 Self {
87 inner: Arc::new(Monitor::new(state)),
88 }
89 }
90
91 /// Acquires the shared monitor and returns a guard.
92 ///
93 /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
94 /// keeps the monitor mutex locked until it is dropped. It can also wait on
95 /// the monitor's condition variable through [`MonitorGuard::wait`] or
96 /// [`MonitorGuard::wait_timeout`].
97 ///
98 /// # Returns
99 ///
100 /// A guard that provides read and write access to the protected state.
101 ///
102 /// # Example
103 ///
104 /// ```rust
105 /// use qubit_lock::ArcMonitor;
106 ///
107 /// let monitor = ArcMonitor::new(1);
108 /// {
109 /// let mut value = monitor.lock();
110 /// *value += 1;
111 /// }
112 ///
113 /// assert_eq!(monitor.read(|value| *value), 2);
114 /// ```
115 #[inline]
116 pub fn lock(&self) -> MonitorGuard<'_, T> {
117 self.inner.lock()
118 }
119
120 /// Acquires the monitor and reads the protected state.
121 ///
122 /// This delegates to [`Monitor::read`]. The closure runs while the monitor
123 /// mutex is held, so keep it short and avoid long blocking work.
124 ///
125 /// # Arguments
126 ///
127 /// * `f` - Closure that receives an immutable reference to the state.
128 ///
129 /// # Returns
130 ///
131 /// The value returned by `f`.
132 #[inline]
133 pub fn read<R, F>(&self, f: F) -> R
134 where
135 F: FnOnce(&T) -> R,
136 {
137 self.inner.read(f)
138 }
139
140 /// Acquires the monitor and mutates the protected state.
141 ///
142 /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
143 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
144 /// waiting thread may observe.
145 ///
146 /// # Arguments
147 ///
148 /// * `f` - Closure that receives a mutable reference to the state.
149 ///
150 /// # Returns
151 ///
152 /// The value returned by `f`.
153 #[inline]
154 pub fn write<R, F>(&self, f: F) -> R
155 where
156 F: FnOnce(&mut T) -> R,
157 {
158 self.inner.write(f)
159 }
160
161 /// Mutates the protected state and wakes one waiter.
162 ///
163 /// This delegates to [`Monitor::write_notify_one`]. The closure runs while
164 /// the monitor mutex is held; after it returns, the lock is released and one
165 /// waiter is notified. If `f` panics, the panic is propagated and no
166 /// notification is sent.
167 ///
168 /// # Arguments
169 ///
170 /// * `f` - Closure that receives a mutable reference to the state.
171 ///
172 /// # Returns
173 ///
174 /// The value returned by `f`.
175 #[inline]
176 pub fn write_notify_one<R, F>(&self, f: F) -> R
177 where
178 F: FnOnce(&mut T) -> R,
179 {
180 self.inner.write_notify_one(f)
181 }
182
183 /// Mutates the protected state and wakes all waiters.
184 ///
185 /// This delegates to [`Monitor::write_notify_all`]. The closure runs while
186 /// the monitor mutex is held; after it returns, the lock is released and all
187 /// waiters are notified. If `f` panics, the panic is propagated and no
188 /// notification is sent.
189 ///
190 /// # Arguments
191 ///
192 /// * `f` - Closure that receives a mutable reference to the state.
193 ///
194 /// # Returns
195 ///
196 /// The value returned by `f`.
197 #[inline]
198 pub fn write_notify_all<R, F>(&self, f: F) -> R
199 where
200 F: FnOnce(&mut T) -> R,
201 {
202 self.inner.write_notify_all(f)
203 }
204
205 /// Waits for a notification or timeout without checking state.
206 ///
207 /// This delegates to [`Monitor::wait_notify`]. Most
208 /// coordination code should prefer [`Self::wait_while`],
209 /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
210 ///
211 /// [`WaitTimeoutStatus::Woken`] means the condition variable was notified,
212 /// but it does not prove that the protected state changed in a useful way.
213 ///
214 /// # Arguments
215 ///
216 /// * `timeout` - Maximum duration to wait for a notification.
217 ///
218 /// # Returns
219 ///
220 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
221 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
222 ///
223 /// # Example
224 ///
225 /// ```rust
226 /// use std::time::Duration;
227 ///
228 /// use qubit_lock::{ArcMonitor, WaitTimeoutStatus};
229 ///
230 /// let monitor = ArcMonitor::new(false);
231 /// let status = monitor.wait_notify(Duration::from_millis(1));
232 ///
233 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
234 /// ```
235 #[inline]
236 pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
237 self.inner.wait_notify(timeout)
238 }
239
240 /// Waits while a predicate remains true, then mutates the protected state.
241 ///
242 /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
243 /// while holding the monitor mutex, and the closure runs while the mutex is
244 /// still held after the predicate stops blocking.
245 ///
246 /// This method may block indefinitely if no thread changes the state so
247 /// that `waiting` becomes false and sends a notification.
248 ///
249 /// # Arguments
250 ///
251 /// * `waiting` - Predicate that returns `true` while the caller should
252 /// keep waiting.
253 /// * `f` - Closure that receives mutable access after waiting is no longer
254 /// required.
255 ///
256 /// # Returns
257 ///
258 /// The value returned by `f`.
259 ///
260 /// # Example
261 ///
262 /// ```rust
263 /// use std::thread;
264 ///
265 /// use qubit_lock::ArcMonitor;
266 ///
267 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
268 /// let worker_monitor = monitor.clone();
269 ///
270 /// let worker = thread::spawn(move || {
271 /// worker_monitor.wait_while(
272 /// |items| items.is_empty(),
273 /// |items| items.pop().expect("item should be ready"),
274 /// )
275 /// });
276 ///
277 /// monitor.write(|items| items.push(7));
278 /// monitor.notify_one();
279 ///
280 /// assert_eq!(worker.join().expect("worker should finish"), 7);
281 /// ```
282 #[inline]
283 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
284 where
285 P: FnMut(&T) -> bool,
286 F: FnOnce(&mut T) -> R,
287 {
288 self.inner.wait_while(waiting, f)
289 }
290
291 /// Waits until the protected state satisfies a predicate, then mutates it.
292 ///
293 /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
294 /// no thread changes the state to satisfy the predicate and sends a
295 /// notification.
296 ///
297 /// # Arguments
298 ///
299 /// * `ready` - Predicate that returns `true` when the state is ready.
300 /// * `f` - Closure that receives mutable access to the ready state.
301 ///
302 /// # Returns
303 ///
304 /// The value returned by `f`.
305 #[inline]
306 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
307 where
308 P: FnMut(&T) -> bool,
309 F: FnOnce(&mut T) -> R,
310 {
311 self.inner.wait_until(ready, f)
312 }
313
314 /// Waits while a predicate remains true, with an overall time limit.
315 ///
316 /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
317 /// false before `timeout` expires, `f` runs while the monitor lock is still
318 /// held. If the timeout expires first, the closure is not called.
319 ///
320 /// # Arguments
321 ///
322 /// * `timeout` - Maximum total duration to wait.
323 /// * `waiting` - Predicate that returns `true` while the caller should
324 /// continue waiting.
325 /// * `f` - Closure that receives mutable access when waiting is no longer
326 /// required.
327 ///
328 /// # Returns
329 ///
330 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
331 /// predicate stops blocking before the timeout. Returns
332 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
333 ///
334 /// # Example
335 ///
336 /// ```rust
337 /// use std::time::Duration;
338 ///
339 /// use qubit_lock::{ArcMonitor, WaitTimeoutResult};
340 ///
341 /// let monitor = ArcMonitor::new(Vec::<i32>::new());
342 /// let result = monitor.wait_timeout_while(
343 /// Duration::from_millis(1),
344 /// |items| items.is_empty(),
345 /// |items| items.pop(),
346 /// );
347 ///
348 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
349 /// ```
350 #[inline]
351 pub fn wait_timeout_while<R, P, F>(
352 &self,
353 timeout: Duration,
354 waiting: P,
355 f: F,
356 ) -> WaitTimeoutResult<R>
357 where
358 P: FnMut(&T) -> bool,
359 F: FnOnce(&mut T) -> R,
360 {
361 self.inner.wait_timeout_while(timeout, waiting, f)
362 }
363
364 /// Waits until a predicate becomes true, with an overall time limit.
365 ///
366 /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
367 /// true before `timeout` expires, `f` runs while the monitor lock is still
368 /// held. If the timeout expires first, the closure is not called.
369 ///
370 /// # Arguments
371 ///
372 /// * `timeout` - Maximum total duration to wait.
373 /// * `ready` - Predicate that returns `true` when the caller may continue.
374 /// * `f` - Closure that receives mutable access to the ready state.
375 ///
376 /// # Returns
377 ///
378 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
379 /// predicate becomes true before the timeout. Returns
380 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
381 ///
382 /// # Example
383 ///
384 /// ```rust
385 /// use std::{
386 /// thread,
387 /// time::Duration,
388 /// };
389 ///
390 /// use qubit_lock::{ArcMonitor, WaitTimeoutResult};
391 ///
392 /// let monitor = ArcMonitor::new(false);
393 /// let worker_monitor = monitor.clone();
394 ///
395 /// let worker = thread::spawn(move || {
396 /// worker_monitor.wait_timeout_until(
397 /// Duration::from_secs(1),
398 /// |ready| *ready,
399 /// |ready| {
400 /// *ready = false;
401 /// 5
402 /// },
403 /// )
404 /// });
405 ///
406 /// monitor.write(|ready| *ready = true);
407 /// monitor.notify_one();
408 ///
409 /// assert_eq!(
410 /// worker.join().expect("worker should finish"),
411 /// WaitTimeoutResult::Ready(5),
412 /// );
413 /// ```
414 #[inline]
415 pub fn wait_timeout_until<R, P, F>(
416 &self,
417 timeout: Duration,
418 ready: P,
419 f: F,
420 ) -> WaitTimeoutResult<R>
421 where
422 P: FnMut(&T) -> bool,
423 F: FnOnce(&mut T) -> R,
424 {
425 self.inner.wait_timeout_until(timeout, ready, f)
426 }
427
428 /// Wakes one thread waiting on this monitor's condition variable.
429 ///
430 /// Notifications do not carry state by themselves. A waiting thread only
431 /// proceeds safely after rechecking the protected state. Call this after
432 /// changing state that may make one waiter able to continue.
433 #[inline]
434 pub fn notify_one(&self) {
435 self.inner.notify_one();
436 }
437
438 /// Wakes all threads waiting on this monitor's condition variable.
439 ///
440 /// Notifications do not carry state by themselves. Every awakened thread
441 /// must recheck the protected state before continuing. Call this after a
442 /// state change that may allow multiple waiters to make progress.
443 #[inline]
444 pub fn notify_all(&self) {
445 self.inner.notify_all();
446 }
447}
448
449impl<T> AsRef<Monitor<T>> for ArcMonitor<T> {
450 /// Returns a reference to the underlying monitor.
451 ///
452 /// This is useful when callers need an explicit [`Monitor`] reference while
453 /// keeping the cloneable [`ArcMonitor`] handle.
454 #[inline]
455 fn as_ref(&self) -> &Monitor<T> {
456 self.inner.as_ref()
457 }
458}
459
460impl<T> Deref for ArcMonitor<T> {
461 type Target = Monitor<T>;
462
463 /// Dereferences this wrapper to the underlying monitor.
464 ///
465 /// Method-call dereferencing lets callers use native [`Monitor`] APIs
466 /// directly, while this wrapper still provides cloneable ownership.
467 #[inline]
468 fn deref(&self) -> &Self::Target {
469 self.inner.as_ref()
470 }
471}
472
473impl<T> From<T> for ArcMonitor<T> {
474 /// Creates an Arc-wrapped monitor from an initial state value.
475 ///
476 /// # Arguments
477 ///
478 /// * `value` - Initial state protected by the monitor.
479 ///
480 /// # Returns
481 ///
482 /// A cloneable monitor handle protecting `value`.
483 #[inline]
484 fn from(value: T) -> Self {
485 Self::new(value)
486 }
487}
488
489impl<T: Default> Default for ArcMonitor<T> {
490 /// Creates an Arc-wrapped monitor containing `T::default()`.
491 ///
492 /// # Returns
493 ///
494 /// A cloneable monitor handle protecting the default value for `T`.
495 #[inline]
496 fn default() -> Self {
497 Self::new(T::default())
498 }
499}
500
501impl<T> Clone for ArcMonitor<T> {
502 /// Clones this monitor handle.
503 ///
504 /// The cloned handle shares the same protected state and condition
505 /// variable with the original.
506 ///
507 /// # Returns
508 ///
509 /// A new handle sharing the same monitor state.
510 #[inline]
511 fn clone(&self) -> Self {
512 Self {
513 inner: self.inner.clone(),
514 }
515 }
516}