gemachain_runtime/
waitable_condvar.rs1use std::{
2 sync::{Condvar, Mutex},
3 time::Duration,
4};
5
6#[derive(Default, Debug)]
9pub struct WaitableCondvar {
10 pub mutex: Mutex<u8>,
11 pub event: Condvar,
12}
13
14impl WaitableCondvar {
15 pub fn notify_all(&self) {
16 self.event.notify_all();
17 }
18 pub fn notify_one(&self) {
19 self.event.notify_one();
20 }
21 pub fn wait_timeout(&self, timeout: Duration) -> bool {
22 let lock = self.mutex.lock().unwrap();
23 let res = self.event.wait_timeout(lock, timeout).unwrap();
24 if res.1.timed_out() {
25 return true;
26 }
27 false
28 }
29}
30
31#[cfg(test)]
32pub mod tests {
33 use super::*;
34 use std::{
35 sync::{
36 atomic::{AtomicBool, Ordering},
37 Arc,
38 },
39 thread::Builder,
40 };
41 #[ignore]
42 #[test]
43 fn test_waitable_condvar() {
44 let data = Arc::new(AtomicBool::new(false));
45 let data_ = data.clone();
46 let cv = Arc::new(WaitableCondvar::default());
47 let cv2 = Arc::new(WaitableCondvar::default());
48 let cv_ = cv.clone();
49 let cv2_ = cv2.clone();
50 let cv2__ = cv2.clone();
51 let passes = 3;
53 let handle = Builder::new().spawn(move || {
54 for _pass in 0..passes {
55 let mut notified = false;
56 while cv2_.wait_timeout(Duration::from_millis(1)) {
57 if !notified && data_.load(Ordering::Relaxed) {
58 notified = true;
59 cv_.notify_all();
60 }
61 }
62 assert!(data_.swap(false, Ordering::Relaxed));
63 }
64 });
65 let handle2 = Builder::new().spawn(move || {
67 for _pass in 0..(passes - 1) {
68 assert!(!cv2__.wait_timeout(Duration::from_millis(10000))); }
70 });
71 for _pass in 0..passes {
72 assert!(cv.wait_timeout(Duration::from_millis(1)));
73 assert!(!data.swap(true, Ordering::Relaxed));
74 assert!(!cv.wait_timeout(Duration::from_millis(10000))); cv2.notify_all();
76 }
77 assert!(handle.unwrap().join().is_ok());
78 assert!(handle2.unwrap().join().is_ok());
79 }
80}