Skip to main content

qubit_lock/monitor/
tokio_monitor.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10//! Tokio-based asynchronous monitor.
11
12use std::time::{
13    Duration,
14    Instant,
15};
16
17use tokio::sync::{
18    Mutex,
19    Notify,
20};
21
22use super::{
23    AsyncConditionWaiter,
24    AsyncMonitorFuture,
25    AsyncNotificationWaiter,
26    AsyncTimeoutConditionWaiter,
27    AsyncTimeoutNotificationWaiter,
28    Notifier,
29    WaitTimeoutResult,
30    WaitTimeoutStatus,
31};
32
33/// Asynchronous monitor built on Tokio synchronization primitives.
34///
35/// `TokioMonitor` protects one state value with a Tokio mutex and coordinates
36/// waiters with a Tokio notification primitive. Notification semantics follow
37/// Tokio's [`Notify`] behavior.
38pub struct TokioMonitor<T> {
39    /// Protected monitor state.
40    state: Mutex<T>,
41    /// Notification primitive used to wake async waiters.
42    changed: Notify,
43}
44
45impl<T> TokioMonitor<T> {
46    /// Creates an asynchronous monitor protecting the supplied state.
47    ///
48    /// # Arguments
49    ///
50    /// * `state` - Initial protected state.
51    ///
52    /// # Returns
53    ///
54    /// A Tokio-based monitor.
55    pub fn new(state: T) -> Self {
56        Self {
57            state: Mutex::new(state),
58            changed: Notify::new(),
59        }
60    }
61
62    /// Acquires the monitor and reads the protected state.
63    ///
64    /// # Arguments
65    ///
66    /// * `f` - Closure that receives an immutable reference to the state.
67    ///
68    /// # Returns
69    ///
70    /// The value returned by the closure.
71    pub async fn async_read<R, F>(&self, f: F) -> R
72    where
73        F: FnOnce(&T) -> R,
74    {
75        let guard = self.state.lock().await;
76        f(&*guard)
77    }
78
79    /// Acquires the monitor and mutates the protected state.
80    ///
81    /// This does not notify waiters automatically.
82    ///
83    /// # Arguments
84    ///
85    /// * `f` - Closure that receives a mutable reference to the state.
86    ///
87    /// # Returns
88    ///
89    /// The value returned by the closure.
90    pub async fn async_write<R, F>(&self, f: F) -> R
91    where
92        F: FnOnce(&mut T) -> R,
93    {
94        let mut guard = self.state.lock().await;
95        f(&mut *guard)
96    }
97
98    /// Mutates the protected state and wakes one waiter.
99    ///
100    /// # Arguments
101    ///
102    /// * `f` - Closure that receives a mutable reference to the state.
103    ///
104    /// # Returns
105    ///
106    /// The value returned by the closure.
107    pub async fn async_write_notify_one<R, F>(&self, f: F) -> R
108    where
109        F: FnOnce(&mut T) -> R,
110    {
111        let result = self.async_write(f).await;
112        self.notify_one();
113        result
114    }
115
116    /// Mutates the protected state and wakes all waiters.
117    ///
118    /// # Arguments
119    ///
120    /// * `f` - Closure that receives a mutable reference to the state.
121    ///
122    /// # Returns
123    ///
124    /// The value returned by the closure.
125    pub async fn async_write_notify_all<R, F>(&self, f: F) -> R
126    where
127        F: FnOnce(&mut T) -> R,
128    {
129        let result = self.async_write(f).await;
130        self.notify_all();
131        result
132    }
133
134    /// Wakes one async waiter.
135    pub fn notify_one(&self) {
136        self.changed.notify_one();
137    }
138
139    /// Wakes all async waiters.
140    pub fn notify_all(&self) {
141        self.changed.notify_waiters();
142    }
143
144    /// Calculates remaining timeout budget from a call-time start instant.
145    ///
146    /// # Arguments
147    ///
148    /// * `start` - Instant captured when the public wait method was called.
149    /// * `timeout` - Total timeout budget.
150    ///
151    /// # Returns
152    ///
153    /// The remaining budget, or zero when the budget is exhausted.
154    fn remaining_timeout(start: Instant, timeout: Duration) -> Duration {
155        timeout.checked_sub(start.elapsed()).unwrap_or_default()
156    }
157}
158
159impl<T> Notifier for TokioMonitor<T> {
160    /// Wakes one async waiter.
161    fn notify_one(&self) {
162        Self::notify_one(self);
163    }
164
165    /// Wakes all async waiters.
166    fn notify_all(&self) {
167        Self::notify_all(self);
168    }
169}
170
171impl<T: Send> AsyncNotificationWaiter for TokioMonitor<T> {
172    /// Returns a future that resolves after a Tokio notification.
173    fn async_wait<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
174        Box::pin(self.changed.notified())
175    }
176}
177
178impl<T: Send> AsyncTimeoutNotificationWaiter for TokioMonitor<T> {
179    /// Returns a future that resolves after notification or timeout.
180    fn async_wait_for<'a>(
181        &'a self,
182        timeout: Duration,
183    ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
184        let start = Instant::now();
185        let notified = self.changed.notified();
186        Box::pin(async move {
187            let remaining = Self::remaining_timeout(start, timeout);
188            if remaining.is_zero() {
189                return WaitTimeoutStatus::TimedOut;
190            }
191            match tokio::time::timeout(remaining, notified).await {
192                Ok(()) => WaitTimeoutStatus::Woken,
193                Err(_) => WaitTimeoutStatus::TimedOut,
194            }
195        })
196    }
197}
198
199impl<T: Send> AsyncConditionWaiter for TokioMonitor<T> {
200    type State = T;
201
202    /// Returns a future that waits until the predicate becomes true.
203    fn async_wait_until<'a, R, P, F>(
204        &'a self,
205        mut predicate: P,
206        action: F,
207    ) -> AsyncMonitorFuture<'a, R>
208    where
209        R: Send + 'a,
210        P: FnMut(&Self::State) -> bool + Send + 'a,
211        F: FnOnce(&mut Self::State) -> R + Send + 'a,
212    {
213        self.async_wait_while(move |state| !predicate(state), action)
214    }
215
216    /// Returns a future that waits while the predicate remains true.
217    fn async_wait_while<'a, R, P, F>(
218        &'a self,
219        mut predicate: P,
220        action: F,
221    ) -> AsyncMonitorFuture<'a, R>
222    where
223        R: Send + 'a,
224        P: FnMut(&Self::State) -> bool + Send + 'a,
225        F: FnOnce(&mut Self::State) -> R + Send + 'a,
226    {
227        Box::pin(async move {
228            let mut guard = self.state.lock().await;
229            while predicate(&*guard) {
230                let notified = self.changed.notified();
231                drop(guard);
232                notified.await;
233                guard = self.state.lock().await;
234            }
235            action(&mut *guard)
236        })
237    }
238}
239
240impl<T: Send> AsyncTimeoutConditionWaiter for TokioMonitor<T> {
241    /// Returns a future that waits until the predicate becomes true or times out.
242    fn async_wait_until_for<'a, R, P, F>(
243        &'a self,
244        timeout: Duration,
245        mut predicate: P,
246        action: F,
247    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
248    where
249        R: Send + 'a,
250        P: FnMut(&Self::State) -> bool + Send + 'a,
251        F: FnOnce(&mut Self::State) -> R + Send + 'a,
252    {
253        self.async_wait_while_for(timeout, move |state| !predicate(state), action)
254    }
255
256    /// Returns a future that waits while the predicate remains true or times out.
257    fn async_wait_while_for<'a, R, P, F>(
258        &'a self,
259        timeout: Duration,
260        mut predicate: P,
261        action: F,
262    ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
263    where
264        R: Send + 'a,
265        P: FnMut(&Self::State) -> bool + Send + 'a,
266        F: FnOnce(&mut Self::State) -> R + Send + 'a,
267    {
268        let start = Instant::now();
269        Box::pin(async move {
270            let mut guard = self.state.lock().await;
271            loop {
272                if !predicate(&*guard) {
273                    return WaitTimeoutResult::Ready(action(&mut *guard));
274                }
275
276                let remaining = Self::remaining_timeout(start, timeout);
277                if remaining.is_zero() {
278                    return WaitTimeoutResult::TimedOut;
279                }
280
281                let notified = self.changed.notified();
282                drop(guard);
283                if tokio::time::timeout(remaining, notified).await.is_err() {
284                    guard = self.state.lock().await;
285                    if !predicate(&*guard) {
286                        return WaitTimeoutResult::Ready(action(&mut *guard));
287                    }
288                    return WaitTimeoutResult::TimedOut;
289                }
290                guard = self.state.lock().await;
291            }
292        })
293    }
294}
295
296impl<T> From<T> for TokioMonitor<T> {
297    /// Creates a Tokio monitor from an initial state value.
298    fn from(value: T) -> Self {
299        Self::new(value)
300    }
301}
302
303impl<T: Default> Default for TokioMonitor<T> {
304    /// Creates a Tokio monitor containing `T::default()`.
305    fn default() -> Self {
306        Self::new(T::default())
307    }
308}