pub mod tbqueue;
pub mod tchan;
pub mod tqueue;
pub mod tvecdequeue;
use crate::Stm;
pub trait TQueueLike<T>: Clone + Send {
fn read(&self) -> Stm<T>;
fn write(&self, value: T) -> Stm<()>;
fn is_empty(&self) -> Stm<bool>;
}
#[cfg(test)]
mod test {
use std::time::Duration;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use super::TQueueLike;
use crate::atomically;
async fn recv_timeout<T>(rx: &mut UnboundedReceiver<T>, t: Duration) -> T {
tokio::time::timeout(t, rx.recv()).await.unwrap().unwrap()
}
pub async fn test_write_and_read_back<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
let queue = mq();
let (x, y) = atomically(|| {
queue.write(42)?;
queue.write(31)?;
let x = queue.read()?;
let y = queue.read()?;
Ok((x, y))
})
.await;
assert_eq!(42, x);
assert_eq!(31, y);
}
pub async fn test_threaded<Q: TQueueLike<i32> + Sync + 'static>(mq: fn() -> Q) {
let queue1 = mq();
let queue2 = queue1.clone();
let (sender, mut receiver) = mpsc::unbounded_channel();
tokio::spawn(async move {
let x = atomically(|| queue2.read()).await;
sender.send(x).unwrap();
});
tokio::time::sleep(Duration::from_millis(100)).await;
atomically(|| queue1.write(42)).await;
let x = recv_timeout(&mut receiver, Duration::from_millis(500)).await;
assert_eq!(42, x);
}
pub async fn test_is_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
let queue = mq();
let is_empty = atomically(|| queue.is_empty()).await;
assert!(is_empty);
}
pub async fn test_non_empty<Q: 'static + TQueueLike<i32>>(mq: fn() -> Q) {
let queue = mq();
atomically(|| queue.write(42)).await;
let is_empty = atomically(|| queue.is_empty()).await;
assert!(!is_empty);
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench {
use super::TQueueLike;
use crate::atomically;
use std::time::Duration;
#[cfg(all(test, feature = "unstable"))]
use etest::Bencher;
fn new_bench_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.unwrap()
}
pub fn bench_two_threads_read_write<Q: TQueueLike<i32> + Sync + 'static>(
b: &mut Bencher,
mq: fn() -> Q,
) {
let rt = new_bench_runtime();
b.iter(|| {
rt.block_on(async {
let n = 1000;
let queue1 = mq();
let queue2 = queue1.clone();
let sender = tokio::spawn(async move {
for i in 1..n {
atomically(|| queue1.write(i)).await;
}
});
let receiver = tokio::spawn(async move {
for i in 1..n {
let r = atomically(|| queue2.read()).await;
assert_eq!(i, r);
}
});
tokio::time::timeout(Duration::from_secs(10), sender)
.await
.unwrap()
.unwrap();
tokio::time::timeout(Duration::from_secs(10), receiver)
.await
.unwrap()
.unwrap();
});
});
}
pub fn bench_one_thread_write_many_then_read<Q: TQueueLike<i32>>(
b: &mut Bencher,
mq: fn() -> Q,
) {
let rt = new_bench_runtime();
b.iter(|| {
rt.block_on(async {
let n = 1000;
let chan = mq();
for i in 1..n {
atomically(|| chan.write(i)).await;
}
for i in 1..n {
let r = atomically(|| chan.read()).await;
assert_eq!(i, r);
}
});
});
}
pub fn bench_one_thread_repeat_write_read<Q: TQueueLike<i32>>(b: &mut Bencher, mq: fn() -> Q) {
let rt = new_bench_runtime();
b.iter(|| {
rt.block_on(async {
let n = 1000;
let m = 100;
let chan = mq();
for i in 1..(n / m) {
for j in 1..m {
atomically(|| chan.write(i * m + j)).await;
}
for j in 1..m {
let r = atomically(|| chan.read()).await;
assert_eq!(i * m + j, r);
}
}
});
});
}
}
#[macro_export]
macro_rules! test_queue_mod {
($make:expr) => {
#[cfg(test)]
mod test_queue {
use $crate::queues::test::*;
#[tokio::test]
async fn write_and_read_back() {
test_write_and_read_back($make).await;
}
#[tokio::test]
async fn threaded() {
test_threaded($make).await;
}
#[tokio::test]
async fn is_empty() {
test_is_empty($make).await;
}
#[tokio::test]
async fn non_empty() {
test_non_empty($make).await;
}
}
#[cfg(all(test, feature = "unstable"))]
mod bench_queue {
use super::super::bench::*;
use etest::Bencher;
#[bench]
fn two_threads_read_write(b: &mut Bencher) {
bench_two_threads_read_write(b, $make);
}
#[bench]
fn one_thread_write_many_then_read(b: &mut Bencher) {
bench_one_thread_write_many_then_read(b, $make);
}
#[bench]
fn one_thread_repeat_write_read(b: &mut Bencher) {
bench_one_thread_repeat_write_read(b, $make);
}
}
};
}