1use std::sync::{
13 Condvar,
14 Mutex,
15 MutexGuard,
16};
17use std::time::Duration;
18
19#[cfg(feature = "async")]
20use tokio::sync::{
21 Notify,
22 watch,
23};
24
25#[cfg(feature = "async")]
26use super::{
27 AsyncConditionWaiter,
28 AsyncMonitorFuture,
29 AsyncNotificationWaiter,
30 AsyncTimeoutConditionWaiter,
31 AsyncTimeoutNotificationWaiter,
32};
33use super::{
34 ConditionWaiter,
35 NotificationWaiter,
36 Notifier,
37 TimeoutConditionWaiter,
38 TimeoutNotificationWaiter,
39 WaitTimeoutResult,
40 WaitTimeoutStatus,
41};
42
43pub struct MockMonitor<T> {
49 state: Mutex<MockMonitorState<T>>,
51 changed: Condvar,
53 #[cfg(feature = "async")]
55 async_notification: Notify,
56 #[cfg(feature = "async")]
58 async_change_sender: watch::Sender<u64>,
59}
60
61struct MockMonitorState<T> {
63 value: T,
65 elapsed: Duration,
67 notification_epoch: u64,
69 change_epoch: u64,
71}
72
73impl<T> MockMonitor<T> {
74 pub fn new(state: T) -> Self {
84 #[cfg(feature = "async")]
85 let (async_change_sender, _) = watch::channel(0);
86 Self {
87 state: Mutex::new(MockMonitorState {
88 value: state,
89 elapsed: Duration::ZERO,
90 notification_epoch: 0,
91 change_epoch: 0,
92 }),
93 changed: Condvar::new(),
94 #[cfg(feature = "async")]
95 async_notification: Notify::new(),
96 #[cfg(feature = "async")]
97 async_change_sender,
98 }
99 }
100
101 pub fn elapsed(&self) -> Duration {
107 self.lock_state().elapsed
108 }
109
110 pub fn set_elapsed(&self, elapsed: Duration) {
118 let change_epoch = {
119 let mut state = self.lock_state();
120 state.elapsed = elapsed;
121 Self::advance_change_epoch(&mut state)
122 };
123 self.changed.notify_all();
124 self.notify_async_change(change_epoch);
125 }
126
127 pub fn advance(&self, duration: Duration) {
133 let change_epoch = {
134 let mut state = self.lock_state();
135 state.elapsed = state.elapsed.saturating_add(duration);
136 Self::advance_change_epoch(&mut state)
137 };
138 self.changed.notify_all();
139 self.notify_async_change(change_epoch);
140 }
141
142 pub fn reset_elapsed(&self) {
144 self.set_elapsed(Duration::ZERO);
145 }
146
147 pub fn read<R, F>(&self, f: F) -> R
157 where
158 F: FnOnce(&T) -> R,
159 {
160 let state = self.lock_state();
161 f(&state.value)
162 }
163
164 pub fn write<R, F>(&self, f: F) -> R
176 where
177 F: FnOnce(&mut T) -> R,
178 {
179 let mut state = self.lock_state();
180 f(&mut state.value)
181 }
182
183 pub fn write_notify_one<R, F>(&self, f: F) -> R
193 where
194 F: FnOnce(&mut T) -> R,
195 {
196 let result = self.write(f);
197 self.notify_one();
198 result
199 }
200
201 pub fn write_notify_all<R, F>(&self, f: F) -> R
211 where
212 F: FnOnce(&mut T) -> R,
213 {
214 let result = self.write(f);
215 self.notify_all();
216 result
217 }
218
219 pub fn notify_one(&self) {
221 let change_epoch = self.advance_notification_epoch();
222 self.changed.notify_one();
223 #[cfg(feature = "async")]
224 self.async_notification.notify_one();
225 self.notify_async_change(change_epoch);
226 }
227
228 pub fn notify_all(&self) {
230 let change_epoch = self.advance_notification_epoch();
231 self.changed.notify_all();
232 #[cfg(feature = "async")]
233 self.async_notification.notify_waiters();
234 self.notify_async_change(change_epoch);
235 }
236
237 fn lock_state(&self) -> MutexGuard<'_, MockMonitorState<T>> {
243 self.state
244 .lock()
245 .unwrap_or_else(std::sync::PoisonError::into_inner)
246 }
247
248 fn advance_change_epoch(state: &mut MockMonitorState<T>) -> u64 {
258 state.change_epoch = state.change_epoch.wrapping_add(1);
259 state.change_epoch
260 }
261
262 fn advance_notification_epoch(&self) -> u64 {
268 let mut state = self.lock_state();
269 state.notification_epoch = state.notification_epoch.wrapping_add(1);
270 Self::advance_change_epoch(&mut state)
271 }
272
273 #[cfg(feature = "async")]
279 fn notify_async_change(&self, change_epoch: u64) {
280 let _ = self.async_change_sender.send(change_epoch);
281 }
282
283 #[cfg(not(feature = "async"))]
285 fn notify_async_change(&self, _change_epoch: u64) {}
286}
287
288impl<T> Notifier for MockMonitor<T> {
289 fn notify_one(&self) {
291 Self::notify_one(self);
292 }
293
294 fn notify_all(&self) {
296 Self::notify_all(self);
297 }
298}
299
300impl<T> NotificationWaiter for MockMonitor<T> {
301 fn wait(&self) {
303 let mut state = self.lock_state();
304 let observed_epoch = state.notification_epoch;
305 while state.notification_epoch == observed_epoch {
306 state = self
307 .changed
308 .wait(state)
309 .unwrap_or_else(std::sync::PoisonError::into_inner);
310 }
311 }
312}
313
314impl<T> TimeoutNotificationWaiter for MockMonitor<T> {
315 fn wait_for(&self, timeout: Duration) -> WaitTimeoutStatus {
317 let mut state = self.lock_state();
318 let observed_epoch = state.notification_epoch;
319 let target_elapsed = state.elapsed.saturating_add(timeout);
320 loop {
321 if state.notification_epoch != observed_epoch {
322 return WaitTimeoutStatus::Woken;
323 }
324 if state.elapsed >= target_elapsed {
325 return WaitTimeoutStatus::TimedOut;
326 }
327 state = self
328 .changed
329 .wait(state)
330 .unwrap_or_else(std::sync::PoisonError::into_inner);
331 }
332 }
333}
334
335impl<T> ConditionWaiter for MockMonitor<T> {
336 type State = T;
337
338 fn wait_while<R, P, F>(&self, mut predicate: P, action: F) -> R
340 where
341 P: FnMut(&Self::State) -> bool,
342 F: FnOnce(&mut Self::State) -> R,
343 {
344 let mut state = self.lock_state();
345 while predicate(&state.value) {
346 state = self
347 .changed
348 .wait(state)
349 .unwrap_or_else(std::sync::PoisonError::into_inner);
350 }
351 action(&mut state.value)
352 }
353}
354
355impl<T> TimeoutConditionWaiter for MockMonitor<T> {
356 fn wait_while_for<R, P, F>(
358 &self,
359 timeout: Duration,
360 mut predicate: P,
361 action: F,
362 ) -> WaitTimeoutResult<R>
363 where
364 P: FnMut(&Self::State) -> bool,
365 F: FnOnce(&mut Self::State) -> R,
366 {
367 let mut state = self.lock_state();
368 let target_elapsed = state.elapsed.saturating_add(timeout);
369 loop {
370 if !predicate(&state.value) {
371 return WaitTimeoutResult::Ready(action(&mut state.value));
372 }
373 if state.elapsed >= target_elapsed {
374 return WaitTimeoutResult::TimedOut;
375 }
376 state = self
377 .changed
378 .wait(state)
379 .unwrap_or_else(std::sync::PoisonError::into_inner);
380 }
381 }
382}
383
384#[cfg(feature = "async")]
385impl<T: Send> AsyncNotificationWaiter for MockMonitor<T> {
386 fn wait_async<'a>(&'a self) -> AsyncMonitorFuture<'a, ()> {
388 let notified = self.async_notification.notified();
389 Box::pin(notified)
390 }
391}
392
393#[cfg(feature = "async")]
394impl<T: Send> AsyncTimeoutNotificationWaiter for MockMonitor<T> {
395 fn wait_for_async<'a>(
397 &'a self,
398 timeout: Duration,
399 ) -> AsyncMonitorFuture<'a, WaitTimeoutStatus> {
400 let mut change_receiver = self.async_change_sender.subscribe();
401 let (observed_epoch, target_elapsed) = {
402 let state = self.lock_state();
403 (
404 state.notification_epoch,
405 state.elapsed.saturating_add(timeout),
406 )
407 };
408 Box::pin(async move {
409 loop {
410 {
411 let state = self.lock_state();
412 if state.notification_epoch != observed_epoch {
413 return WaitTimeoutStatus::Woken;
414 }
415 if state.elapsed >= target_elapsed {
416 return WaitTimeoutStatus::TimedOut;
417 }
418 }
419 change_receiver
420 .changed()
421 .await
422 .expect("mock monitor sender should live while the monitor is borrowed");
423 }
424 })
425 }
426}
427
428#[cfg(feature = "async")]
429impl<T: Send> AsyncConditionWaiter for MockMonitor<T> {
430 type State = T;
431
432 fn wait_while_async<'a, R, P, F>(
434 &'a self,
435 mut predicate: P,
436 action: F,
437 ) -> AsyncMonitorFuture<'a, R>
438 where
439 R: Send + 'a,
440 P: FnMut(&Self::State) -> bool + Send + 'a,
441 F: FnOnce(&mut Self::State) -> R + Send + 'a,
442 {
443 Box::pin(async move {
444 loop {
445 let notified = {
446 let mut state = self.lock_state();
447 if !predicate(&state.value) {
448 return action(&mut state.value);
449 }
450 self.async_notification.notified()
451 };
452 notified.await;
453 }
454 })
455 }
456}
457
458#[cfg(feature = "async")]
459impl<T: Send> AsyncTimeoutConditionWaiter for MockMonitor<T> {
460 fn wait_while_for_async<'a, R, P, F>(
462 &'a self,
463 timeout: Duration,
464 mut predicate: P,
465 action: F,
466 ) -> AsyncMonitorFuture<'a, WaitTimeoutResult<R>>
467 where
468 R: Send + 'a,
469 P: FnMut(&Self::State) -> bool + Send + 'a,
470 F: FnOnce(&mut Self::State) -> R + Send + 'a,
471 {
472 let target_elapsed = self.elapsed().saturating_add(timeout);
473 let mut change_receiver = self.async_change_sender.subscribe();
474 Box::pin(async move {
475 loop {
476 {
477 let mut state = self.lock_state();
478 if !predicate(&state.value) {
479 return WaitTimeoutResult::Ready(action(&mut state.value));
480 }
481 if state.elapsed >= target_elapsed {
482 return WaitTimeoutResult::TimedOut;
483 }
484 }
485 change_receiver
486 .changed()
487 .await
488 .expect("mock monitor sender should live while the monitor is borrowed");
489 }
490 })
491 }
492}
493
494impl<T> From<T> for MockMonitor<T> {
495 fn from(value: T) -> Self {
497 Self::new(value)
498 }
499}
500
501impl<T: Default> Default for MockMonitor<T> {
502 fn default() -> Self {
504 Self::new(T::default())
505 }
506}