qubit_lock/monitor/
tokio_monitor.rs1use std::time::{
13 Duration,
14 Instant,
15};
16
17use tokio::sync::{
18 Mutex,
19 Notify,
20};
21
22use super::{
23 AsyncConditionWaiter,
24 AsyncMonitorFuture,
25 AsyncNotificationWaiter,
26 AsyncTimeoutConditionWaiter,
27 AsyncTimeoutNotificationWaiter,
28 Notifier,
29 WaitTimeoutResult,
30 WaitTimeoutStatus,
31};
32
33pub struct TokioMonitor<T> {
39 state: Mutex<T>,
41 changed: Notify,
43}
44
45impl<T> TokioMonitor<T> {
46 pub fn new(state: T) -> Self {
56 Self {
57 state: Mutex::new(state),
58 changed: Notify::new(),
59 }
60 }
61
62 pub async fn async_read<R, F>(&self, f: F) -> R
72 where
73 F: FnOnce(&T) -> R,
74 {
75 let guard = self.state.lock().await;
76 f(&*guard)
77 }
78
79 pub async fn async_write<R, F>(&self, f: F) -> R
91 where
92 F: FnOnce(&mut T) -> R,
93 {
94 let mut guard = self.state.lock().await;
95 f(&mut *guard)
96 }
97
98 pub async fn async_write_notify_one<R, F>(&self, f: F) -> R
108 where
109 F: FnOnce(&mut T) -> R,
110 {
111 let result = self.async_write(f).await;
112 self.notify_one();
113 result
114 }
115
116 pub async fn async_write_notify_all<R, F>(&self, f: F) -> R
126 where
127 F: FnOnce(&mut T) -> R,
128 {
129 let result = self.async_write(f).await;
130 self.notify_all();
131 result
132 }
133
134 pub fn notify_one(&self) {
136 self.changed.notify_one();
137 }
138
139 pub fn notify_all(&self) {
141 self.changed.notify_waiters();
142 }
143
144 fn remaining_timeout(start: Instant, timeout: Duration) -> Duration {
155 timeout.checked_sub(start.elapsed()).unwrap_or_default()
156 }
157}
158
159impl<T> Notifier for TokioMonitor<T> {
160 fn notify_one(&self) {
162 Self::notify_one(self);
163 }
164
165 fn notify_all(&self) {
167 Self::notify_all(self);
168 }
169}
170
171impl<T: Send> AsyncNotificationWaiter for TokioMonitor<T> {
172 fn async_wait<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
174 Box::pin(self.changed.notified())
175 }
176}
177
178impl<T: Send> AsyncTimeoutNotificationWaiter for TokioMonitor<T> {
179 fn async_wait_for<'a>(
181 &'a self,
182 timeout: Duration,
183 ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
184 let start = Instant::now();
185 let notified = self.changed.notified();
186 Box::pin(async move {
187 let remaining = Self::remaining_timeout(start, timeout);
188 if remaining.is_zero() {
189 return WaitTimeoutStatus::TimedOut;
190 }
191 match tokio::time::timeout(remaining, notified).await {
192 Ok(()) => WaitTimeoutStatus::Woken,
193 Err(_) => WaitTimeoutStatus::TimedOut,
194 }
195 })
196 }
197}
198
199impl<T: Send> AsyncConditionWaiter for TokioMonitor<T> {
200 type State = T;
201
202 fn async_wait_until<'a, R, P, F>(
204 &'a self,
205 mut predicate: P,
206 action: F,
207 ) -> AsyncMonitorFuture<'a, R>
208 where
209 R: Send + 'a,
210 P: FnMut(&Self::State) -> bool + Send + 'a,
211 F: FnOnce(&mut Self::State) -> R + Send + 'a,
212 {
213 self.async_wait_while(move |state| !predicate(state), action)
214 }
215
216 fn async_wait_while<'a, R, P, F>(
218 &'a self,
219 mut predicate: P,
220 action: F,
221 ) -> AsyncMonitorFuture<'a, R>
222 where
223 R: Send + 'a,
224 P: FnMut(&Self::State) -> bool + Send + 'a,
225 F: FnOnce(&mut Self::State) -> R + Send + 'a,
226 {
227 Box::pin(async move {
228 let mut guard = self.state.lock().await;
229 while predicate(&*guard) {
230 let notified = self.changed.notified();
231 drop(guard);
232 notified.await;
233 guard = self.state.lock().await;
234 }
235 action(&mut *guard)
236 })
237 }
238}
239
240impl<T: Send> AsyncTimeoutConditionWaiter for TokioMonitor<T> {
241 fn async_wait_until_for<'a, R, P, F>(
243 &'a self,
244 timeout: Duration,
245 mut predicate: P,
246 action: F,
247 ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
248 where
249 R: Send + 'a,
250 P: FnMut(&Self::State) -> bool + Send + 'a,
251 F: FnOnce(&mut Self::State) -> R + Send + 'a,
252 {
253 self.async_wait_while_for(timeout, move |state| !predicate(state), action)
254 }
255
256 fn async_wait_while_for<'a, R, P, F>(
258 &'a self,
259 timeout: Duration,
260 mut predicate: P,
261 action: F,
262 ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
263 where
264 R: Send + 'a,
265 P: FnMut(&Self::State) -> bool + Send + 'a,
266 F: FnOnce(&mut Self::State) -> R + Send + 'a,
267 {
268 let start = Instant::now();
269 Box::pin(async move {
270 let mut guard = self.state.lock().await;
271 loop {
272 if !predicate(&*guard) {
273 return WaitTimeoutResult::Ready(action(&mut *guard));
274 }
275
276 let remaining = Self::remaining_timeout(start, timeout);
277 if remaining.is_zero() {
278 return WaitTimeoutResult::TimedOut;
279 }
280
281 let notified = self.changed.notified();
282 drop(guard);
283 if tokio::time::timeout(remaining, notified).await.is_err() {
284 guard = self.state.lock().await;
285 if !predicate(&*guard) {
286 return WaitTimeoutResult::Ready(action(&mut *guard));
287 }
288 return WaitTimeoutResult::TimedOut;
289 }
290 guard = self.state.lock().await;
291 }
292 })
293 }
294}
295
296impl<T> From<T> for TokioMonitor<T> {
297 fn from(value: T) -> Self {
299 Self::new(value)
300 }
301}
302
303impl<T: Default> Default for TokioMonitor<T> {
304 fn default() -> Self {
306 Self::new(T::default())
307 }
308}