use std::{thread, time};
use recstrm::*;
use testtools::thread::expect_runtime;
enum MyErr {
Something
}
#[derive(PartialEq, Eq, Hash, Clone)]
enum Checkpoint {
RecSent
}
#[test]
fn send_after_recv() {
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
let jh = thread::spawn(move || {
thread::sleep(time::Duration::from_millis(500));
tx.send("hello").unwrap();
});
if let Some(node) = rx.recv().unwrap() {
assert_eq!(node, "hello");
}
jh.join().unwrap();
}
#[test]
fn send_before_recv() {
let sync = testtools::sync::Checkpoint::new();
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
let tsync = sync.clone();
let jh = thread::spawn(move || {
tx.send("hello").unwrap();
tsync.reached(Checkpoint::RecSent);
});
sync.waitfor([Checkpoint::RecSent]);
if let Some(node) = rx.recv().unwrap() {
assert_eq!(node, "hello");
}
jh.join().unwrap();
}
#[test]
fn eof_after_sender_drop() {
let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>();
drop(tx);
let Ok(None) = rx.recv() else {
panic!("Unexpectedly not Ok(None)");
};
}
#[test]
fn eof_with_queued_records() {
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
tx.send(42).unwrap();
drop(tx);
let Ok(Some(v)) = rx.recv() else {
panic!("Unexpectedly not Ok(Some(42))");
};
assert_eq!(v, 42);
let Ok(None) = rx.recv() else {
panic!("Unexpectedly not Ok(None)");
};
}
#[test]
fn try_send_full_queue() {
let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>();
tx.send(42).unwrap();
let Err(Error::QueueFull) = tx.try_send(1147) else {
panic!("Unexpectedly not Err(Error::QueueFull)");
};
}
#[test]
fn order() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
tx.send(0).unwrap();
tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();
assert_eq!(rx.recv().unwrap(), Some(0));
assert_eq!(rx.recv().unwrap(), Some(1));
assert_eq!(rx.recv().unwrap(), Some(2));
assert_eq!(rx.recv().unwrap(), Some(3));
}
#[test]
fn batch_order() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch(ints.into_iter()).unwrap();
assert_eq!(rx.recv().unwrap(), Some(0));
assert_eq!(rx.recv().unwrap(), Some(1));
assert_eq!(rx.recv().unwrap(), Some(2));
assert_eq!(rx.recv().unwrap(), Some(3));
}
#[test]
fn recv_all() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch(ints.into_iter()).unwrap();
let res = rx.recv_all().unwrap().unwrap();
assert_eq!(res, [0, 1, 2, 3]);
}
#[test]
fn recv_atmost() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch(ints.into_iter()).unwrap();
let res = rx.recv_atmost(2).unwrap().unwrap();
assert_eq!(res, [0, 1]);
let res = rx.recv_atmost(2).unwrap().unwrap();
assert_eq!(res, [2, 3]);
}
#[test]
fn sender_blocking() {
const GRACE_MS: u64 = 250;
let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>();
let jh = thread::spawn(move || {
let ints = [0, 1];
tx.send_batch(ints.into_iter()).unwrap();
expect_runtime(time::Duration::from_millis(GRACE_MS), || {
tx.send(2).unwrap();
});
});
thread::sleep(time::Duration::from_millis(GRACE_MS));
let res = rx.recv_atmost(2).unwrap().unwrap();
assert_eq!(res, [0, 1]);
let res = rx.recv().unwrap();
assert_eq!(res, Some(2));
jh.join().unwrap();
}
#[test]
fn receiver_disappeared() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
drop(rx);
let Err(Error::ReceiverDisappeared) = tx.send("hello") else {
panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
};
}
#[test]
fn fail_sender() {
let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();
tx.fail(MyErr::Something);
let Err(Error::App(MyErr::Something)) = rx.recv() else {
panic!("Unexpected return value");
};
}
#[test]
fn fail_receiver() {
let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();
rx.fail(MyErr::Something);
let Err(Error::App(MyErr::Something)) = tx.send("hello") else {
panic!("Unexpected return value");
};
}
#[test]
fn drop_sender_before_num_recs() {
let (tx, rx) = Builder::new()
.queue_size(4)
.num_records(8)
.build::<&str, ()>();
drop(tx);
let Err(Error::RecordsUnderflow) = rx.recv() else {
panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
};
}
#[test]
fn drop_sender_before_num_recs_one_off() {
let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch(ints.into_iter()).unwrap();
let res = rx.recv_all().unwrap().unwrap();
assert_eq!(res, [0, 1, 2, 3]);
let ints = [4, 5, 6];
tx.send_batch(ints.into_iter()).unwrap();
drop(tx);
let Err(Error::RecordsUnderflow) = rx.recv() else {
panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
};
}
#[test]
fn drop_sender_after_num_recs() {
let (tx, rx) = Builder::new().queue_size(4).num_records(8).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch(ints.into_iter()).unwrap();
let res = rx.recv_all().unwrap().unwrap();
assert_eq!(res, [0, 1, 2, 3]);
let ints = [4, 5, 6, 7];
tx.send_batch(ints.into_iter()).unwrap();
drop(tx);
let res = rx.recv_all().unwrap().unwrap();
assert_eq!(res, [4, 5, 6, 7]);
}