qubit_lock/monitor/arc_std_monitor.rs
1/*******************************************************************************
2 *
3 * Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 *
7 * Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! # Arc StdMonitor
11//!
12//! Provides an Arc-wrapped synchronous monitor for condition-based state
13//! coordination across threads.
14//!
15
16use std::{
17 ops::Deref,
18 sync::Arc,
19 time::Duration,
20};
21
22use super::{
23 ConditionWaiter,
24 NotificationWaiter,
25 Notifier,
26 StdMonitor,
27 StdMonitorGuard,
28 TimeoutConditionWaiter,
29 TimeoutNotificationWaiter,
30 WaitTimeoutResult,
31 WaitTimeoutStatus,
32};
33
34/// Arc-wrapped monitor for shared condition-based state coordination.
35///
36/// `ArcStdMonitor` stores a [`StdMonitor`] behind an [`Arc`], so callers can clone
37/// the monitor handle directly without writing `Arc::new(StdMonitor::new(...))`.
38/// It preserves the same guard-based waiting, predicate-based waiting, and
39/// poison recovery semantics as [`StdMonitor`]. It implements [`Deref`] and
40/// [`AsRef`] so callers can pass it to APIs that expect a [`StdMonitor`]
41/// reference.
42///
43/// # Type Parameters
44///
45/// * `T` - The state protected by this monitor.
46///
47/// # Example
48///
49/// ```rust
50/// use std::thread;
51///
52/// use qubit_lock::ArcStdMonitor;
53///
54/// let monitor = ArcStdMonitor::new(false);
55/// let waiter_monitor = monitor.clone();
56///
57/// let waiter = thread::spawn(move || {
58/// waiter_monitor.wait_until(
59/// |ready| *ready,
60/// |ready| {
61/// *ready = false;
62/// },
63/// );
64/// });
65///
66/// monitor.write(|ready| {
67/// *ready = true;
68/// });
69/// monitor.notify_all();
70///
71/// waiter.join().expect("waiter should finish");
72/// assert!(!monitor.read(|ready| *ready));
73/// ```
74///
75pub struct ArcStdMonitor<T> {
76 /// Shared monitor instance.
77 inner: Arc<StdMonitor<T>>,
78}
79
80impl<T> ArcStdMonitor<T> {
81 /// Creates an Arc-wrapped monitor protecting the supplied state value.
82 ///
83 /// # Arguments
84 ///
85 /// * `state` - Initial state protected by the monitor.
86 ///
87 /// # Returns
88 ///
89 /// A cloneable monitor handle initialized with the supplied state.
90 #[inline]
91 pub fn new(state: T) -> Self {
92 Self {
93 inner: Arc::new(StdMonitor::new(state)),
94 }
95 }
96
97 /// Acquires the shared monitor and returns a guard.
98 ///
99 /// This delegates to [`StdMonitor::lock`]. The returned [`StdMonitorGuard`]
100 /// keeps the monitor mutex locked until it is dropped. It can also wait on
101 /// the monitor's condition variable through [`StdMonitorGuard::wait`] or
102 /// [`StdMonitorGuard::wait_timeout`].
103 ///
104 /// If the underlying mutex is poisoned, this method recovers the inner
105 /// state and still returns a guard.
106 ///
107 /// # Returns
108 ///
109 /// A guard that provides read and write access to the protected state.
110 ///
111 /// # Example
112 ///
113 /// ```rust
114 /// use qubit_lock::ArcStdMonitor;
115 ///
116 /// let monitor = ArcStdMonitor::new(1);
117 /// {
118 /// let mut value = monitor.lock();
119 /// *value += 1;
120 /// }
121 ///
122 /// assert_eq!(monitor.read(|value| *value), 2);
123 /// ```
124 #[inline]
125 pub fn lock(&self) -> StdMonitorGuard<'_, T> {
126 self.inner.lock()
127 }
128
129 /// Acquires the monitor and reads the protected state.
130 ///
131 /// This delegates to [`StdMonitor::read`]. The closure runs while the monitor
132 /// mutex is held, so keep it short and avoid long blocking work.
133 ///
134 /// # Arguments
135 ///
136 /// * `f` - Closure that receives an immutable reference to the state.
137 ///
138 /// # Returns
139 ///
140 /// The value returned by `f`.
141 #[inline]
142 pub fn read<R, F>(&self, f: F) -> R
143 where
144 F: FnOnce(&T) -> R,
145 {
146 self.inner.read(f)
147 }
148
149 /// Acquires the monitor and mutates the protected state.
150 ///
151 /// This delegates to [`StdMonitor::write`]. Callers should explicitly invoke
152 /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
153 /// waiting thread may observe.
154 ///
155 /// # Arguments
156 ///
157 /// * `f` - Closure that receives a mutable reference to the state.
158 ///
159 /// # Returns
160 ///
161 /// The value returned by `f`.
162 #[inline]
163 pub fn write<R, F>(&self, f: F) -> R
164 where
165 F: FnOnce(&mut T) -> R,
166 {
167 self.inner.write(f)
168 }
169
170 /// Mutates the protected state and wakes one waiter.
171 ///
172 /// This delegates to [`StdMonitor::write_notify_one`]. The closure runs
173 /// while the monitor mutex is held; after it returns, the lock is released
174 /// and one waiter is notified. If `f` panics, the panic is propagated and no
175 /// notification is sent.
176 ///
177 /// # Arguments
178 ///
179 /// * `f` - Closure that receives a mutable reference to the state.
180 ///
181 /// # Returns
182 ///
183 /// The value returned by `f`.
184 #[inline]
185 pub fn write_notify_one<R, F>(&self, f: F) -> R
186 where
187 F: FnOnce(&mut T) -> R,
188 {
189 self.inner.write_notify_one(f)
190 }
191
192 /// Mutates the protected state and wakes all waiters.
193 ///
194 /// This delegates to [`StdMonitor::write_notify_all`]. The closure runs
195 /// while the monitor mutex is held; after it returns, the lock is released
196 /// and all waiters are notified. If `f` panics, the panic is propagated and
197 /// no notification is sent.
198 ///
199 /// # Arguments
200 ///
201 /// * `f` - Closure that receives a mutable reference to the state.
202 ///
203 /// # Returns
204 ///
205 /// The value returned by `f`.
206 #[inline]
207 pub fn write_notify_all<R, F>(&self, f: F) -> R
208 where
209 F: FnOnce(&mut T) -> R,
210 {
211 self.inner.write_notify_all(f)
212 }
213
214 /// Waits for a notification without checking state.
215 ///
216 /// This delegates to [`StdMonitor::wait`].
217 #[inline]
218 pub fn wait(&self) {
219 self.inner.wait();
220 }
221
222 /// Waits for a notification or timeout without checking state.
223 ///
224 /// This delegates to [`StdMonitor::wait_for`]. Most
225 /// coordination code should prefer [`Self::wait_while`],
226 /// [`Self::wait_until`], or an explicit [`StdMonitorGuard`] loop.
227 ///
228 /// Condition variables may wake spuriously, so
229 /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
230 /// state.
231 ///
232 /// # Arguments
233 ///
234 /// * `timeout` - Maximum duration to wait for a notification.
235 ///
236 /// # Returns
237 ///
238 /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
239 /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
240 ///
241 /// # Example
242 ///
243 /// ```rust
244 /// use std::time::Duration;
245 ///
246 /// use qubit_lock::{ArcStdMonitor, WaitTimeoutStatus};
247 ///
248 /// let monitor = ArcStdMonitor::new(false);
249 /// let status = monitor.wait_for(Duration::from_millis(1));
250 ///
251 /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
252 /// ```
253 #[inline]
254 pub fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
255 self.inner.wait_for(timeout)
256 }
257
258 /// Waits while a predicate remains true, then mutates the protected state.
259 ///
260 /// This delegates to [`StdMonitor::wait_while`]. The predicate is evaluated
261 /// while holding the monitor mutex, and the closure runs while the mutex is
262 /// still held after the predicate stops blocking.
263 ///
264 /// This method may block indefinitely if no thread changes the state so
265 /// that `waiting` becomes false and sends a notification.
266 ///
267 /// # Arguments
268 ///
269 /// * `waiting` - Predicate that returns `true` while the caller should
270 /// keep waiting.
271 /// * `f` - Closure that receives mutable access after waiting is no longer
272 /// required.
273 ///
274 /// # Returns
275 ///
276 /// The value returned by `f`.
277 ///
278 /// # Example
279 ///
280 /// ```rust
281 /// use std::thread;
282 ///
283 /// use qubit_lock::ArcStdMonitor;
284 ///
285 /// let monitor = ArcStdMonitor::new(Vec::<i32>::new());
286 /// let worker_monitor = monitor.clone();
287 ///
288 /// let worker = thread::spawn(move || {
289 /// worker_monitor.wait_while(
290 /// |items| items.is_empty(),
291 /// |items| items.pop().expect("item should be ready"),
292 /// )
293 /// });
294 ///
295 /// monitor.write(|items| items.push(7));
296 /// monitor.notify_one();
297 ///
298 /// assert_eq!(worker.join().expect("worker should finish"), 7);
299 /// ```
300 #[inline]
301 pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
302 where
303 P: FnMut(&T) -> bool,
304 F: FnOnce(&mut T) -> R,
305 {
306 self.inner.wait_while(waiting, f)
307 }
308
309 /// Waits until the protected state satisfies a predicate, then mutates it.
310 ///
311 /// This delegates to [`StdMonitor::wait_until`]. It may block indefinitely if
312 /// no thread changes the state to satisfy the predicate and sends a
313 /// notification.
314 ///
315 /// # Arguments
316 ///
317 /// * `ready` - Predicate that returns `true` when the state is ready.
318 /// * `f` - Closure that receives mutable access to the ready state.
319 ///
320 /// # Returns
321 ///
322 /// The value returned by `f`.
323 #[inline]
324 pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
325 where
326 P: FnMut(&T) -> bool,
327 F: FnOnce(&mut T) -> R,
328 {
329 self.inner.wait_until(ready, f)
330 }
331
332 /// Waits while a predicate remains true, with an overall time limit.
333 ///
334 /// This delegates to [`StdMonitor::wait_while_for`]. If `waiting` becomes
335 /// false before `timeout` expires, `f` runs while the monitor lock is still
336 /// held. If the timeout expires first, the closure is not called.
337 ///
338 /// # Arguments
339 ///
340 /// * `timeout` - Maximum total duration to wait.
341 /// * `waiting` - Predicate that returns `true` while the caller should
342 /// continue waiting.
343 /// * `f` - Closure that receives mutable access when waiting is no longer
344 /// required.
345 ///
346 /// # Returns
347 ///
348 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
349 /// predicate stops blocking before the timeout. Returns
350 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
351 ///
352 /// # Example
353 ///
354 /// ```rust
355 /// use std::time::Duration;
356 ///
357 /// use qubit_lock::{ArcStdMonitor, WaitTimeoutResult};
358 ///
359 /// let monitor = ArcStdMonitor::new(Vec::<i32>::new());
360 /// let result = monitor.wait_while_for(
361 /// Duration::from_millis(1),
362 /// |items| items.is_empty(),
363 /// |items| items.pop(),
364 /// );
365 ///
366 /// assert_eq!(result, WaitTimeoutResult::TimedOut);
367 /// ```
368 #[inline]
369 pub fn wait_while_for<R, P, F>(
370 &self,
371 timeout: Duration,
372 waiting: P,
373 f: F,
374 ) -> WaitTimeoutResult<R>
375 where
376 P: FnMut(&T) -> bool,
377 F: FnOnce(&mut T) -> R,
378 {
379 self.inner.wait_while_for(timeout, waiting, f)
380 }
381
382 /// Waits until a predicate becomes true, with an overall time limit.
383 ///
384 /// This delegates to [`StdMonitor::wait_until_for`]. If `ready` becomes
385 /// true before `timeout` expires, `f` runs while the monitor lock is still
386 /// held. If the timeout expires first, the closure is not called.
387 ///
388 /// # Arguments
389 ///
390 /// * `timeout` - Maximum total duration to wait.
391 /// * `ready` - Predicate that returns `true` when the caller may continue.
392 /// * `f` - Closure that receives mutable access to the ready state.
393 ///
394 /// # Returns
395 ///
396 /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
397 /// predicate becomes true before the timeout. Returns
398 /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
399 ///
400 /// # Example
401 ///
402 /// ```rust
403 /// use std::{
404 /// thread,
405 /// time::Duration,
406 /// };
407 ///
408 /// use qubit_lock::{ArcStdMonitor, WaitTimeoutResult};
409 ///
410 /// let monitor = ArcStdMonitor::new(false);
411 /// let worker_monitor = monitor.clone();
412 ///
413 /// let worker = thread::spawn(move || {
414 /// worker_monitor.wait_until_for(
415 /// Duration::from_secs(1),
416 /// |ready| *ready,
417 /// |ready| {
418 /// *ready = false;
419 /// 5
420 /// },
421 /// )
422 /// });
423 ///
424 /// monitor.write(|ready| *ready = true);
425 /// monitor.notify_one();
426 ///
427 /// assert_eq!(
428 /// worker.join().expect("worker should finish"),
429 /// WaitTimeoutResult::Ready(5),
430 /// );
431 /// ```
432 #[inline]
433 pub fn wait_until_for<R, P, F>(&self, timeout: Duration, ready: P, f: F) -> WaitTimeoutResult<R>
434 where
435 P: FnMut(&T) -> bool,
436 F: FnOnce(&mut T) -> R,
437 {
438 self.inner.wait_until_for(timeout, ready, f)
439 }
440
441 /// Wakes one thread waiting on this monitor's condition variable.
442 ///
443 /// Notifications do not carry state by themselves. A waiting thread only
444 /// proceeds safely after rechecking the protected state. Call this after
445 /// changing state that may make one waiter able to continue.
446 #[inline]
447 pub fn notify_one(&self) {
448 self.inner.notify_one();
449 }
450
451 /// Wakes all threads waiting on this monitor's condition variable.
452 ///
453 /// Notifications do not carry state by themselves. Every awakened thread
454 /// must recheck the protected state before continuing. Call this after a
455 /// state change that may allow multiple waiters to make progress.
456 #[inline]
457 pub fn notify_all(&self) {
458 self.inner.notify_all();
459 }
460}
461
462impl<T> AsRef<StdMonitor<T>> for ArcStdMonitor<T> {
463 /// Returns a reference to the underlying standard monitor.
464 ///
465 /// This is useful when callers need an explicit [`StdMonitor`] reference
466 /// while keeping the cloneable [`ArcStdMonitor`] handle.
467 #[inline]
468 fn as_ref(&self) -> &StdMonitor<T> {
469 self.inner.as_ref()
470 }
471}
472
473impl<T> Notifier for ArcStdMonitor<T> {
474 /// Wakes one thread waiting on this monitor.
475 #[inline]
476 fn notify_one(&self) {
477 Self::notify_one(self);
478 }
479
480 /// Wakes all threads waiting on this monitor.
481 #[inline]
482 fn notify_all(&self) {
483 Self::notify_all(self);
484 }
485}
486
487impl<T> NotificationWaiter for ArcStdMonitor<T> {
488 /// Blocks until a notification wakes this waiter.
489 #[inline]
490 fn wait(&self) {
491 Self::wait(self);
492 }
493}
494
495impl<T> TimeoutNotificationWaiter for ArcStdMonitor<T> {
496 /// Blocks until a notification wakes this waiter or the timeout expires.
497 #[inline]
498 fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
499 Self::wait_for(self, timeout)
500 }
501}
502
503impl<T> ConditionWaiter for ArcStdMonitor<T> {
504 type State = T;
505
506 /// Blocks until the predicate becomes true, then runs the action.
507 #[inline]
508 fn wait_until<R, P, F>(&self, predicate: P, action: F) -> R
509 where
510 P: FnMut(&Self::State) -> bool,
511 F: FnOnce(&mut Self::State) -> R,
512 {
513 Self::wait_until(self, predicate, action)
514 }
515
516 /// Blocks while the predicate remains true, then runs the action.
517 #[inline]
518 fn wait_while<R, P, F>(&self, predicate: P, action: F) -> R
519 where
520 P: FnMut(&Self::State) -> bool,
521 F: FnOnce(&mut Self::State) -> R,
522 {
523 Self::wait_while(self, predicate, action)
524 }
525}
526
527impl<T> TimeoutConditionWaiter for ArcStdMonitor<T> {
528 /// Blocks until the predicate becomes true or the timeout expires.
529 #[inline]
530 fn wait_until_for<R, P, F>(
531 &self,
532 timeout: Duration,
533 predicate: P,
534 action: F,
535 ) -> WaitTimeoutResult<R>
536 where
537 P: FnMut(&Self::State) -> bool,
538 F: FnOnce(&mut Self::State) -> R,
539 {
540 Self::wait_until_for(self, timeout, predicate, action)
541 }
542
543 /// Blocks while the predicate remains true or until the timeout expires.
544 #[inline]
545 fn wait_while_for<R, P, F>(
546 &self,
547 timeout: Duration,
548 predicate: P,
549 action: F,
550 ) -> WaitTimeoutResult<R>
551 where
552 P: FnMut(&Self::State) -> bool,
553 F: FnOnce(&mut Self::State) -> R,
554 {
555 Self::wait_while_for(self, timeout, predicate, action)
556 }
557}
558
559impl<T> Deref for ArcStdMonitor<T> {
560 type Target = StdMonitor<T>;
561
562 /// Dereferences this wrapper to the underlying standard monitor.
563 ///
564 /// Method-call dereferencing lets callers use native [`StdMonitor`] APIs
565 /// directly, while this wrapper still provides cloneable ownership.
566 #[inline]
567 fn deref(&self) -> &Self::Target {
568 self.inner.as_ref()
569 }
570}
571
572impl<T> From<T> for ArcStdMonitor<T> {
573 /// Creates an Arc-wrapped standard monitor from an initial state value.
574 ///
575 /// # Arguments
576 ///
577 /// * `value` - Initial state protected by the monitor.
578 ///
579 /// # Returns
580 ///
581 /// A cloneable standard monitor handle protecting `value`.
582 #[inline]
583 fn from(value: T) -> Self {
584 Self::new(value)
585 }
586}
587
588impl<T: Default> Default for ArcStdMonitor<T> {
589 /// Creates an Arc-wrapped monitor containing `T::default()`.
590 ///
591 /// # Returns
592 ///
593 /// A cloneable monitor handle protecting the default value for `T`.
594 #[inline]
595 fn default() -> Self {
596 Self::new(T::default())
597 }
598}
599
600impl<T> Clone for ArcStdMonitor<T> {
601 /// Clones this monitor handle.
602 ///
603 /// The cloned handle shares the same protected state and condition
604 /// variable with the original.
605 ///
606 /// # Returns
607 ///
608 /// A new handle sharing the same monitor state.
609 #[inline]
610 fn clone(&self) -> Self {
611 Self {
612 inner: self.inner.clone(),
613 }
614 }
615}