use std::cmp::Ordering;
use std::sync::{Arc, Barrier};
use std::time::Duration;
use crate::executor::{Executor, Queue, ThreadPool};
use crate::monitor::{Monitor};
use crate::test_utils::LONG_WAIT;
use crate::wait;
use crate::wait::{Wait, WaitResult};
use crate::xlock::{ArrivalOrdered, XLock};
#[test]
fn readers_do_not_block_without_writer() {
let lock = XLock::<_, ArrivalOrdered>::new(0);
let _guard_1 = lock.read();
let _guard_2 = lock.read();
assert_eq!(3, lock.next_ticket());
assert_eq!(2, lock.serviced_tickets());
}
#[test]
fn interleaving_writer_blocks_reader() {
let lock = Arc::new(XLock::<_, ArrivalOrdered>::new(0));
let guard_1 = lock.read();
assert_eq!(2, lock.next_ticket());
assert_eq!(1, lock.serviced_tickets());
let t_2 = ThreadPool::new(1, Queue::Unbounded);
let t_2_write = {
let lock = lock.clone();
t_2.submit(move || {
lock.write();
})
};
lock.wait_for_next_ticket(Ordering::is_ge, 3, LONG_WAIT).unwrap();
assert_eq!(1, lock.serviced_tickets());
let guard_3 = lock.try_read(Duration::ZERO);
assert!(guard_3.is_none());
assert_eq!(4, lock.next_ticket());
assert_eq!(2, lock.serviced_tickets());
drop(guard_1);
assert!(t_2_write.get().is_success());
assert_eq!(4, lock.next_ticket());
assert_eq!(3, lock.serviced_tickets());
let guard_4 = lock.try_read(Duration::ZERO);
assert!(guard_4.is_some());
drop(guard_4);
}
#[test]
fn queuing_order() {
let lock = Arc::new(XLock::<_, ArrivalOrdered>::new(0));
let guard_1 = lock.read();
assert_eq!(2, lock.next_ticket());
assert_eq!(1, lock.serviced_tickets());
let t_2 = ThreadPool::new(1, Queue::Unbounded);
let t_3 = ThreadPool::new(1, Queue::Unbounded);
let t_4 = ThreadPool::new(1, Queue::Unbounded);
let t_5 = ThreadPool::new(1, Queue::Unbounded);
let t_2_write_release = Arc::new(Barrier::new(2));
let t_2_write = {
let lock = lock.clone();
let t_2_write_release = t_2_write_release.clone();
t_2.submit(move || {
let guard = lock.write();
t_2_write_release.wait();
drop(guard);
})
};
lock.wait_for_next_ticket(Ordering::is_ge, 3, LONG_WAIT).unwrap();
assert_eq!(1, lock.serviced_tickets());
let t_3_read = {
let lock = lock.clone();
t_3.submit(move || {
lock.read();
})
};
lock.wait_for_next_ticket(Ordering::is_ge, 4, LONG_WAIT).unwrap();
assert_eq!(1, lock.serviced_tickets());
let t_4_read_release = Arc::new(Barrier::new(2));
let t_4_read = {
let lock = lock.clone();
let t_4_read_release = t_4_read_release.clone();
t_4.submit(move || {
let guard = lock.read();
t_4_read_release.wait();
drop(guard);
})
};
lock.wait_for_next_ticket(Ordering::is_ge, 5, LONG_WAIT).unwrap();
assert_eq!(1, lock.serviced_tickets());
let t_5_write = {
let lock = lock.clone();
t_5.submit(move || {
lock.write();
})
};
lock.wait_for_next_ticket(Ordering::is_ge, 6, LONG_WAIT).unwrap();
assert_eq!(1, lock.serviced_tickets());
assert!(!t_2_write.is_complete());
assert!(!t_3_read.is_complete());
assert!(!t_4_read.is_complete());
assert!(!t_5_write.is_complete());
drop(guard_1);
assert!(!t_2_write.is_complete());
assert!(!t_3_read.is_complete());
assert!(!t_4_read.is_complete());
assert!(!t_5_write.is_complete());
t_2_write_release.wait();
assert!(t_2_write.get().is_success());
assert!(t_3_read.get().is_success());
assert!(!t_5_write.is_complete());
t_4_read_release.wait();
assert!(t_4_read.get().is_success());
assert!(t_5_write.get().is_success());
assert_eq!(5, lock.serviced_tickets());
}
impl<T> XLock<T, ArrivalOrdered> {
fn next_ticket(&self) -> u64 {
self.sync.monitor.compute(|state| state.next_ticket)
}
fn serviced_tickets(&self) -> u64 {
self.sync.monitor.compute(|state| state.serviced_tickets)
}
fn wait_for_next_ticket(&self, cmp: impl FnMut(Ordering) -> bool, target: u64, duration: Duration) -> WaitResult {
wait::Spin::wait_for_inequality(|| self.next_ticket(), cmp, &target, duration)
}
}