use std::cmp::Ordering;
use std::sync::{Arc, Barrier};
use std::sync::atomic::AtomicBool;
use std::time::Duration;
use crate::monitor::{Directive, Monitor, SpeculativeMonitor};
use crate::{test_utils, wait};
use crate::test_utils::{LONG_WAIT, SHORT_WAIT};
use crate::wait::{Wait, WaitResult};
#[test]
fn return_immediately() {
let monitor = SpeculativeMonitor::new(0);
let mut invocations = 0;
monitor.enter(|val| {
assert_eq!(0, *val);
*val = 42;
invocations += 1;
Directive::Return
});
assert_eq!(1, invocations);
let mut invocations = 0;
monitor.enter(|val| {
assert_eq!(42, *val);
invocations += 1;
Directive::Return
});
assert_eq!(1, invocations);
assert_eq!(0, monitor.num_waiting());
}
#[test]
fn wait_for_nothing() {
let monitor = SpeculativeMonitor::new(());
let mut invocations = 0;
monitor.enter(|_| {
invocations += 1;
Directive::Wait(Duration::ZERO)
});
assert_eq!(2, invocations);
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
let mut invocations = 0;
monitor.enter(|_| {
invocations += 1;
Directive::Wait(SHORT_WAIT)
});
assert!(invocations >=2);
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
}
#[test]
fn notify_nothing() {
let monitor = SpeculativeMonitor::new(());
let mut invocations = 0;
monitor.enter(|_| {
invocations += 1;
Directive::NotifyOne
});
assert_eq!(1, invocations);
let mut invocations = 0;
monitor.enter(|_| {
invocations += 1;
Directive::NotifyAll
});
assert_eq!(1, invocations);
}
#[test]
fn wait_for_notify() {
for _ in 0..10 {
let monitor = Arc::new(SpeculativeMonitor::new(false));
let t_2_waited = Arc::new(Barrier::new(2));
let t_2 = {
let monitor = monitor.clone();
let t_2_waited = t_2_waited.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|flag| {
match flag {
true => {
*flag = false;
t_2_waited.wait();
Directive::Return
},
false => Directive::Wait(Duration::MAX)
}
})
})
};
assert!(!t_2.is_finished());
monitor.wait_for_num_waiting(Ordering::is_eq, 1, LONG_WAIT).unwrap();
monitor.enter(|flag| {
*flag = true;
Directive::NotifyOne
});
t_2_waited.wait();
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
monitor.enter(|flag| {
assert!(!*flag);
Directive::Return
});
t_2.join().unwrap();
}
}
#[test]
fn wait_for_notify_twice() {
for _ in 0..10 {
let monitor = Arc::new(SpeculativeMonitor::new(false));
let t_2_awoken = Arc::new(AtomicBool::new(false));
let t_2 = {
let monitor = monitor.clone();
let t_2_awoken = t_2_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|flag| {
match flag {
true => {
*flag = false;
t_2_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
Directive::Return
},
false => Directive::Wait(Duration::MAX)
}
})
})
};
let t_3_awoken = Arc::new(AtomicBool::new(false));
let t_3 = {
let monitor = monitor.clone();
let t_3_awoken = t_3_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|flag| {
match flag {
true => {
*flag = false;
t_3_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
Directive::Return
},
false => Directive::Wait(Duration::MAX)
}
})
})
};
assert!(!t_2.is_finished());
assert!(!t_3.is_finished());
monitor.wait_for_num_waiting(Ordering::is_eq, 2, LONG_WAIT).unwrap();
monitor.enter(|flag| {
*flag = true;
Directive::NotifyOne
});
monitor.wait_for_num_waiting(Ordering::is_eq, 1, LONG_WAIT).unwrap();
wait::Spin::wait_for(|| {
t_2_awoken.load(std::sync::atomic::Ordering::Relaxed) || t_3_awoken.load(std::sync::atomic::Ordering::Relaxed)
}, LONG_WAIT).unwrap();
assert_ne!(t_2_awoken.load(std::sync::atomic::Ordering::Relaxed), t_3_awoken.load(std::sync::atomic::Ordering::Relaxed));
monitor.enter(|flag| {
assert!(!*flag);
Directive::Return
});
monitor.enter(|flag| {
*flag = true;
Directive::NotifyOne
});
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
wait::Spin::wait_for(|| {
t_2_awoken.load(std::sync::atomic::Ordering::Relaxed) && t_3_awoken.load(std::sync::atomic::Ordering::Relaxed)
}, LONG_WAIT).unwrap();
monitor.enter(|flag| {
assert!(!*flag);
Directive::Return
});
t_2.join().unwrap();
t_3.join().unwrap();
}
}
#[test]
fn wait_for_notify_all() {
for _ in 0..10 {
let monitor = Arc::new(SpeculativeMonitor::new(false));
let t_2_awoken = Arc::new(AtomicBool::new(false));
let t_2 = {
let monitor = monitor.clone();
let t_2_awoken = t_2_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|flag| {
match flag {
true => {
t_2_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
Directive::Return
},
false => Directive::Wait(Duration::MAX)
}
})
})
};
let t_3_awoken = Arc::new(AtomicBool::new(false));
let t_3 = {
let monitor = monitor.clone();
let t_3_awoken = t_3_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|flag| {
match flag {
true => {
t_3_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
Directive::Return
},
false => Directive::Wait(Duration::MAX)
}
})
})
};
assert!(!t_2.is_finished());
assert!(!t_3.is_finished());
monitor.wait_for_num_waiting(Ordering::is_eq, 2, LONG_WAIT).unwrap();
monitor.enter(|flag| {
*flag = true;
Directive::NotifyAll
});
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
t_2.join().unwrap();
t_3.join().unwrap();
}
}
#[test]
fn wait_notify_chain() {
let monitor = Arc::new(SpeculativeMonitor::new(1u8));
let t_2_awoken = Arc::new(AtomicBool::new(false));
let t_2 = {
let monitor = monitor.clone();
let t_2_awoken = t_2_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|val| {
match val {
2 => {
t_2_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
*val += 1;
Directive::NotifyAll
},
_ => Directive::Wait(Duration::MAX)
}
})
})
};
let t_3_awoken = Arc::new(AtomicBool::new(false));
let t_3 = {
let monitor = monitor.clone();
let t_3_awoken = t_3_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|val| {
match val {
3 => {
t_3_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
*val += 1;
Directive::NotifyOne },
_ => Directive::Wait(Duration::MAX)
}
})
})
};
let t_4_awoken = Arc::new(AtomicBool::new(false));
let t_4 = {
let monitor = monitor.clone();
let t_4_awoken = t_4_awoken.clone();
test_utils::spawn_blocked(move || {
monitor.enter(|val| {
match val {
4 => {
t_4_awoken.store(true, std::sync::atomic::Ordering::Relaxed);
*val += 1;
Directive::Return },
_ => Directive::Wait(Duration::MAX)
}
})
})
};
assert!(!t_2.is_finished());
assert!(!t_3.is_finished());
assert!(!t_4.is_finished());
monitor.wait_for_num_waiting(Ordering::is_eq, 3, LONG_WAIT).unwrap();
monitor.enter(|val| {
*val = 2; Directive::NotifyAll
});
monitor.wait_for_num_waiting(Ordering::is_eq, 0, LONG_WAIT).unwrap();
t_2.join().unwrap();
t_3.join().unwrap();
t_4.join().unwrap();
}
impl<T> SpeculativeMonitor<T> {
fn wait_for_num_waiting(&self, cmp: impl FnMut(Ordering) -> bool, target: u32, duration: Duration) -> WaitResult {
wait::Spin::wait_for_inequality(|| self.num_waiting(), cmp, &target, duration)
}
}