qubit-lock 0.3.2

Lock utilities library providing synchronous, asynchronous, and monitor-based locking primitives
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
/*******************************************************************************
 *
 *    Copyright (c) 2025 - 2026 Haixing Hu.
 *
 *    SPDX-License-Identifier: Apache-2.0
 *
 *    Licensed under the Apache License, Version 2.0.
 *
 ******************************************************************************/
//! # Arc Monitor
//!
//! Provides an Arc-wrapped synchronous monitor for condition-based state
//! coordination across threads.
//!

use std::sync::Arc;
use std::time::Duration;

use super::{
    Monitor,
    MonitorGuard,
    WaitTimeoutResult,
    WaitTimeoutStatus,
};

/// Arc-wrapped monitor for shared condition-based state coordination.
///
/// `ArcMonitor` stores a [`Monitor`] behind an [`Arc`], so callers can clone
/// the monitor handle directly without writing `Arc::new(Monitor::new(...))`.
/// It preserves the same guard-based waiting, predicate-based waiting, and
/// poison recovery semantics as [`Monitor`].
///
/// # Type Parameters
///
/// * `T` - The state protected by this monitor.
///
/// # Example
///
/// ```rust
/// use std::thread;
///
/// use qubit_lock::lock::ArcMonitor;
///
/// let monitor = ArcMonitor::new(false);
/// let waiter_monitor = monitor.clone();
///
/// let waiter = thread::spawn(move || {
///     waiter_monitor.wait_until(
///         |ready| *ready,
///         |ready| {
///             *ready = false;
///         },
///     );
/// });
///
/// monitor.write(|ready| {
///     *ready = true;
/// });
/// monitor.notify_all();
///
/// waiter.join().expect("waiter should finish");
/// assert!(!monitor.read(|ready| *ready));
/// ```
///
pub struct ArcMonitor<T> {
    /// Shared monitor instance.
    inner: Arc<Monitor<T>>,
}

impl<T> ArcMonitor<T> {
    /// Creates an Arc-wrapped monitor protecting the supplied state value.
    ///
    /// # Arguments
    ///
    /// * `state` - Initial state protected by the monitor.
    ///
    /// # Returns
    ///
    /// A cloneable monitor handle initialized with the supplied state.
    #[inline]
    pub fn new(state: T) -> Self {
        Self {
            inner: Arc::new(Monitor::new(state)),
        }
    }

    /// Acquires the shared monitor and returns a guard.
    ///
    /// This delegates to [`Monitor::lock`]. The returned [`MonitorGuard`]
    /// keeps the monitor mutex locked until it is dropped. It can also wait on
    /// the monitor's condition variable through [`MonitorGuard::wait`] or
    /// [`MonitorGuard::wait_timeout`].
    ///
    /// If the underlying mutex is poisoned, this method recovers the inner
    /// state and still returns a guard.
    ///
    /// # Returns
    ///
    /// A guard that provides read and write access to the protected state.
    ///
    /// # Example
    ///
    /// ```rust
    /// use qubit_lock::lock::ArcMonitor;
    ///
    /// let monitor = ArcMonitor::new(1);
    /// {
    ///     let mut value = monitor.lock();
    ///     *value += 1;
    /// }
    ///
    /// assert_eq!(monitor.read(|value| *value), 2);
    /// ```
    #[inline]
    pub fn lock(&self) -> MonitorGuard<'_, T> {
        self.inner.lock()
    }

    /// Acquires the monitor and reads the protected state.
    ///
    /// This delegates to [`Monitor::read`]. The closure runs while the monitor
    /// mutex is held, so keep it short and avoid long blocking work.
    ///
    /// # Arguments
    ///
    /// * `f` - Closure that receives an immutable reference to the state.
    ///
    /// # Returns
    ///
    /// The value returned by `f`.
    #[inline]
    pub fn read<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&T) -> R,
    {
        self.inner.read(f)
    }

    /// Acquires the monitor and mutates the protected state.
    ///
    /// This delegates to [`Monitor::write`]. Callers should explicitly invoke
    /// [`Self::notify_one`] or [`Self::notify_all`] after changing state that a
    /// waiting thread may observe.
    ///
    /// # Arguments
    ///
    /// * `f` - Closure that receives a mutable reference to the state.
    ///
    /// # Returns
    ///
    /// The value returned by `f`.
    #[inline]
    pub fn write<R, F>(&self, f: F) -> R
    where
        F: FnOnce(&mut T) -> R,
    {
        self.inner.write(f)
    }

    /// Waits for a notification or timeout without checking state.
    ///
    /// This delegates to [`Monitor::wait_notify`]. Most
    /// coordination code should prefer [`Self::wait_while`],
    /// [`Self::wait_until`], or an explicit [`MonitorGuard`] loop.
    ///
    /// Condition variables may wake spuriously, so
    /// [`WaitTimeoutStatus::Woken`] does not prove that a notifier changed the
    /// state.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum duration to wait for a notification.
    ///
    /// # Returns
    ///
    /// [`WaitTimeoutStatus::Woken`] if the wait returned before the timeout,
    /// or [`WaitTimeoutStatus::TimedOut`] if the timeout elapsed.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::time::Duration;
    ///
    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutStatus};
    ///
    /// let monitor = ArcMonitor::new(false);
    /// let status = monitor.wait_notify(Duration::from_millis(1));
    ///
    /// assert_eq!(status, WaitTimeoutStatus::TimedOut);
    /// ```
    #[inline]
    pub fn wait_notify(&self, timeout: Duration) -> WaitTimeoutStatus {
        self.inner.wait_notify(timeout)
    }

    /// Waits while a predicate remains true, then mutates the protected state.
    ///
    /// This delegates to [`Monitor::wait_while`]. The predicate is evaluated
    /// while holding the monitor mutex, and the closure runs while the mutex is
    /// still held after the predicate stops blocking.
    ///
    /// This method may block indefinitely if no thread changes the state so
    /// that `waiting` becomes false and sends a notification.
    ///
    /// # Arguments
    ///
    /// * `waiting` - Predicate that returns `true` while the caller should
    ///   keep waiting.
    /// * `f` - Closure that receives mutable access after waiting is no longer
    ///   required.
    ///
    /// # Returns
    ///
    /// The value returned by `f`.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::thread;
    ///
    /// use qubit_lock::lock::ArcMonitor;
    ///
    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
    /// let worker_monitor = monitor.clone();
    ///
    /// let worker = thread::spawn(move || {
    ///     worker_monitor.wait_while(
    ///         |items| items.is_empty(),
    ///         |items| items.pop().expect("item should be ready"),
    ///     )
    /// });
    ///
    /// monitor.write(|items| items.push(7));
    /// monitor.notify_one();
    ///
    /// assert_eq!(worker.join().expect("worker should finish"), 7);
    /// ```
    #[inline]
    pub fn wait_while<R, P, F>(&self, waiting: P, f: F) -> R
    where
        P: FnMut(&T) -> bool,
        F: FnOnce(&mut T) -> R,
    {
        self.inner.wait_while(waiting, f)
    }

    /// Waits until the protected state satisfies a predicate, then mutates it.
    ///
    /// This delegates to [`Monitor::wait_until`]. It may block indefinitely if
    /// no thread changes the state to satisfy the predicate and sends a
    /// notification.
    ///
    /// # Arguments
    ///
    /// * `ready` - Predicate that returns `true` when the state is ready.
    /// * `f` - Closure that receives mutable access to the ready state.
    ///
    /// # Returns
    ///
    /// The value returned by `f`.
    #[inline]
    pub fn wait_until<R, P, F>(&self, ready: P, f: F) -> R
    where
        P: FnMut(&T) -> bool,
        F: FnOnce(&mut T) -> R,
    {
        self.inner.wait_until(ready, f)
    }

    /// Waits while a predicate remains true, with an overall time limit.
    ///
    /// This delegates to [`Monitor::wait_timeout_while`]. If `waiting` becomes
    /// false before `timeout` expires, `f` runs while the monitor lock is still
    /// held. If the timeout expires first, the closure is not called.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum total duration to wait.
    /// * `waiting` - Predicate that returns `true` while the caller should
    ///   continue waiting.
    /// * `f` - Closure that receives mutable access when waiting is no longer
    ///   required.
    ///
    /// # Returns
    ///
    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
    /// predicate stops blocking before the timeout. Returns
    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::time::Duration;
    ///
    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
    ///
    /// let monitor = ArcMonitor::new(Vec::<i32>::new());
    /// let result = monitor.wait_timeout_while(
    ///     Duration::from_millis(1),
    ///     |items| items.is_empty(),
    ///     |items| items.pop(),
    /// );
    ///
    /// assert_eq!(result, WaitTimeoutResult::TimedOut);
    /// ```
    #[inline]
    pub fn wait_timeout_while<R, P, F>(
        &self,
        timeout: Duration,
        waiting: P,
        f: F,
    ) -> WaitTimeoutResult<R>
    where
        P: FnMut(&T) -> bool,
        F: FnOnce(&mut T) -> R,
    {
        self.inner.wait_timeout_while(timeout, waiting, f)
    }

    /// Waits until a predicate becomes true, with an overall time limit.
    ///
    /// This delegates to [`Monitor::wait_timeout_until`]. If `ready` becomes
    /// true before `timeout` expires, `f` runs while the monitor lock is still
    /// held. If the timeout expires first, the closure is not called.
    ///
    /// # Arguments
    ///
    /// * `timeout` - Maximum total duration to wait.
    /// * `ready` - Predicate that returns `true` when the caller may continue.
    /// * `f` - Closure that receives mutable access to the ready state.
    ///
    /// # Returns
    ///
    /// [`WaitTimeoutResult::Ready`] with the value returned by `f` when the
    /// predicate becomes true before the timeout. Returns
    /// [`WaitTimeoutResult::TimedOut`] when the timeout expires first.
    ///
    /// # Example
    ///
    /// ```rust
    /// use std::{
    ///     thread,
    ///     time::Duration,
    /// };
    ///
    /// use qubit_lock::lock::{ArcMonitor, WaitTimeoutResult};
    ///
    /// let monitor = ArcMonitor::new(false);
    /// let worker_monitor = monitor.clone();
    ///
    /// let worker = thread::spawn(move || {
    ///     worker_monitor.wait_timeout_until(
    ///         Duration::from_secs(1),
    ///         |ready| *ready,
    ///         |ready| {
    ///             *ready = false;
    ///             5
    ///         },
    ///     )
    /// });
    ///
    /// monitor.write(|ready| *ready = true);
    /// monitor.notify_one();
    ///
    /// assert_eq!(
    ///     worker.join().expect("worker should finish"),
    ///     WaitTimeoutResult::Ready(5),
    /// );
    /// ```
    #[inline]
    pub fn wait_timeout_until<R, P, F>(
        &self,
        timeout: Duration,
        ready: P,
        f: F,
    ) -> WaitTimeoutResult<R>
    where
        P: FnMut(&T) -> bool,
        F: FnOnce(&mut T) -> R,
    {
        self.inner.wait_timeout_until(timeout, ready, f)
    }

    /// Wakes one thread waiting on this monitor's condition variable.
    ///
    /// Notifications do not carry state by themselves. A waiting thread only
    /// proceeds safely after rechecking the protected state. Call this after
    /// changing state that may make one waiter able to continue.
    #[inline]
    pub fn notify_one(&self) {
        self.inner.notify_one();
    }

    /// Wakes all threads waiting on this monitor's condition variable.
    ///
    /// Notifications do not carry state by themselves. Every awakened thread
    /// must recheck the protected state before continuing. Call this after a
    /// state change that may allow multiple waiters to make progress.
    #[inline]
    pub fn notify_all(&self) {
        self.inner.notify_all();
    }
}

impl<T: Default> Default for ArcMonitor<T> {
    /// Creates an Arc-wrapped monitor containing `T::default()`.
    ///
    /// # Returns
    ///
    /// A cloneable monitor handle protecting the default value for `T`.
    #[inline]
    fn default() -> Self {
        Self::new(T::default())
    }
}

impl<T> Clone for ArcMonitor<T> {
    /// Clones this monitor handle.
    ///
    /// The cloned handle shares the same protected state and condition
    /// variable with the original.
    ///
    /// # Returns
    ///
    /// A new handle sharing the same monitor state.
    #[inline]
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
        }
    }
}