use loole as flume;
use flume::*;
use std::time::{Duration, Instant};
#[test]
fn send_recv() {
let (tx, rx) = unbounded();
for i in 0..1000 {
tx.send(i).unwrap();
}
for i in 0..1000 {
assert_eq!(rx.try_recv().unwrap(), i);
}
assert!(rx.try_recv().is_err());
}
#[test]
fn iter() {
let (tx, rx) = unbounded();
for i in 0..1000 {
tx.send(i).unwrap();
}
drop(tx);
assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
}
#[test]
fn try_iter() {
let (tx, rx) = unbounded();
for i in 0..1000 {
tx.send(i).unwrap();
}
assert_eq!(rx.try_iter().sum::<u32>(), (0..1000).sum());
}
#[test]
fn iter_threaded() {
let (tx, rx) = unbounded();
for i in 0..1000 {
let tx = tx.clone();
std::thread::spawn(move || tx.send(i).unwrap());
}
drop(tx);
assert_eq!(rx.iter().sum::<u32>(), (0..1000).sum());
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] #[test]
fn send_timeout() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = bounded(1);
assert!(tx.send_timeout(42, dur).is_ok());
let then = Instant::now();
assert!(tx.send_timeout(43, dur).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
assert_eq!(rx.drain().count(), 1);
drop(rx);
assert!(tx.send_timeout(42, Duration::from_millis(350)).is_err());
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] #[test]
fn recv_timeout() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = unbounded();
let then = Instant::now();
assert!(rx.recv_timeout(dur).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
tx.send(42).unwrap();
assert_eq!(rx.recv_timeout(dur), Ok(42));
assert!(Instant::now().duration_since(now) < max_error);
}
#[cfg_attr(any(target_os = "macos", windows), ignore)] #[test]
fn recv_deadline() {
let dur = Duration::from_millis(350);
let max_error = Duration::from_millis(5);
let dur_min = dur.checked_sub(max_error).unwrap();
let dur_max = dur.checked_add(max_error).unwrap();
let (tx, rx) = unbounded();
let then = Instant::now();
assert!(rx.recv_deadline(then.checked_add(dur).unwrap()).is_err());
let now = Instant::now();
let this = now.duration_since(then);
if !(dur_min < this && this < dur_max) {
panic!("timeout exceeded: {:?}", this);
}
tx.send(42).unwrap();
assert_eq!(rx.recv_deadline(now.checked_add(dur).unwrap()), Ok(42));
assert!(Instant::now().duration_since(now) < max_error);
}
#[test]
fn recv_timeout_missed_send() {
let (tx, rx) = bounded(10);
assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
tx.send(42).unwrap();
assert_eq!(rx.recv(), Ok(42));
}
#[test]
fn disconnect_tx() {
let (tx, rx) = unbounded::<()>();
drop(tx);
assert!(rx.recv().is_err());
}
#[test]
fn disconnect_rx() {
let (tx, rx) = unbounded();
drop(rx);
assert!(tx.send(0).is_err());
}
#[test]
fn drain() {
let (tx, rx) = unbounded();
for i in 0..100 {
tx.send(i).unwrap();
}
assert_eq!(rx.drain().sum::<u32>(), (0..100).sum());
for i in 0..100 {
tx.send(i).unwrap();
}
for i in 0..100 {
tx.send(i).unwrap();
}
rx.recv().unwrap();
(1u32..100)
.chain(0..100)
.zip(rx)
.for_each(|(l, r)| assert_eq!(l, r));
}
#[test]
fn try_send() {
let (tx, rx) = bounded(5);
for i in 0..5 {
tx.try_send(i).unwrap();
}
assert!(tx.try_send(42).is_err());
assert_eq!(rx.recv(), Ok(0));
assert_eq!(tx.try_send(42), Ok(()));
assert_eq!(rx.recv(), Ok(1));
drop(rx);
assert!(tx.try_send(42).is_err());
}
#[test]
fn send_bounded() {
let (tx, rx) = bounded(5);
for _ in 0..5 {
tx.send(42).unwrap();
}
let _ = rx.recv().unwrap();
tx.send(42).unwrap();
assert!(tx.try_send(42).is_err());
rx.drain();
let mut ts = Vec::new();
for _ in 0..100 {
let tx = tx.clone();
ts.push(std::thread::spawn(move || {
for i in 0..10000 {
tx.send(i).unwrap();
}
}));
}
drop(tx);
assert_eq!(rx.iter().sum::<u64>(), (0..10000).sum::<u64>() * 100);
for t in ts {
t.join().unwrap();
}
assert!(rx.recv().is_err());
}
#[test]
fn rendezvous() {
let (tx, rx) = bounded(0);
for i in 0..5 {
let tx = tx.clone();
let t = std::thread::spawn(move || {
assert!(tx.try_send(()).is_err());
let then = Instant::now();
tx.send(()).unwrap();
let now = Instant::now();
assert!(
now.duration_since(then) > Duration::from_millis(100),
"iter = {}",
i
);
});
std::thread::sleep(Duration::from_millis(500));
rx.recv().unwrap();
t.join().unwrap();
}
}
#[test]
fn hydra() {
let thread_num = 32;
let msg_num = 1000;
let (main_tx, main_rx) = unbounded::<()>();
let mut txs = Vec::new();
for _ in 0..thread_num {
let main_tx = main_tx.clone();
let (tx, rx) = unbounded();
txs.push(tx);
std::thread::spawn(move || {
for msg in rx.iter() {
main_tx.send(msg).unwrap();
}
});
}
drop(main_tx);
for _ in 0..10 {
for tx in &txs {
for _ in 0..msg_num {
tx.send(Default::default()).unwrap();
}
}
for _ in 0..thread_num {
for _ in 0..msg_num {
main_rx.recv().unwrap();
}
}
}
drop(txs);
assert!(main_rx.recv().is_err());
}
#[test]
fn robin() {
let thread_num = 32;
let msg_num = 10;
let (mut main_tx, main_rx) = bounded::<()>(1);
for _ in 0..thread_num {
let (mut tx, rx) = bounded(100);
std::mem::swap(&mut tx, &mut main_tx);
std::thread::spawn(move || {
for msg in rx.iter() {
tx.send(msg).unwrap();
}
});
}
for _ in 0..10 {
let main_tx = main_tx.clone();
std::thread::spawn(move || {
for _ in 0..msg_num {
main_tx.send(Default::default()).unwrap();
}
});
for _ in 0..msg_num {
main_rx.recv().unwrap();
}
}
}
#[allow(dead_code)]
struct MessageWithoutDebug(u32);
#[test]
fn std_error_without_debug() {
let (tx, rx) = unbounded::<MessageWithoutDebug>();
match tx.send(MessageWithoutDebug(1)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.recv() {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match tx.try_send(MessageWithoutDebug(2)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.try_recv() {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match tx.send_timeout(MessageWithoutDebug(3), Duration::from_secs(1000000)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
match rx.recv_timeout(Duration::from_secs(10000000)) {
Ok(_) => {}
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
}
}