extern crate core;
use std::thread;
use std::time::Duration;
fn is_send<T: Send>() {}
#[test]
fn bounds() {
is_send::<omango::spsc::Sender<i32>>();
is_send::<omango::spsc::Receiver<i32>>();
is_send::<omango::mpmc::Sender<i32>>();
is_send::<omango::mpmc::Receiver<i32>>();
}
#[test]
fn send_recv() {
{
let (tx, rx) = omango::spsc::bounded(4);
tx.send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
}
let (tx, rx) = omango::mpmc::bounded(4);
tx.send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
}
#[test]
fn send_shared_recv() {
{
let (tx1, rx) = omango::spsc::bounded(4);
let tx2 = tx1.clone();
tx1.send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
tx2.send(2).unwrap();
assert_eq!(rx.recv().unwrap(), 2);
}
let (tx1, rx) = omango::mpmc::bounded(4);
let tx2 = tx1.clone();
tx1.send(1).unwrap();
assert_eq!(rx.recv().unwrap(), 1);
tx2.send(2).unwrap();
assert_eq!(rx.recv().unwrap(), 2);
}
#[test]
fn send_recv_threads() {
{
let (tx, rx) = omango::spsc::bounded(4);
let thread = thread::spawn(move || {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
thread.join().unwrap();
}
let (tx, rx) = omango::mpmc::bounded(4);
let thread = thread::spawn(move || {
tx.send(1).unwrap();
});
assert_eq!(rx.recv().unwrap(), 1);
thread.join().unwrap();
}
#[test]
fn send_recv_threads_no_capacity() {
{
let (tx, rx) = omango::spsc::bounded(0);
let thread = thread::spawn(move || {
tx.send(1).unwrap();
tx.send(2).unwrap();
});
thread::sleep(Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 1);
thread::sleep(Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 2);
thread.join().unwrap();
}
let (tx, rx) = omango::mpmc::bounded(0);
let thread = thread::spawn(move || {
tx.send(1).unwrap();
tx.send(2).unwrap();
});
thread::sleep(Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 1);
thread::sleep(Duration::from_millis(100));
assert_eq!(rx.recv().unwrap(), 2);
thread.join().unwrap();
}
#[test]
fn send_close_gets_none() {
{
let (tx, rx) = omango::spsc::bounded::<i32>(0);
let thread = thread::spawn(move || {
assert!(rx.recv().is_err());
});
tx.close();
thread.join().unwrap();
}
let (tx, rx) = omango::mpmc::bounded::<i32>(0);
let thread = thread::spawn(move || {
assert!(rx.recv().is_err());
});
tx.close();
thread.join().unwrap();
}
#[test]
fn spsc_no_capacity() {
let amt = 10000;
let (tx, rx) = omango::spsc::bounded(0);
let txc = tx.clone();
thread::spawn(move || {
for _ in 0..amt {
assert_eq!(txc.send(1), Ok(()));
}
});
for _ in 0..amt {
assert_eq!(rx.recv(), Ok(1));
}
}
#[test]
fn mpsc_no_capacity() {
let amt = 10000;
let nthreads = (2 * num_cpus::get()) - 1;
let (tx, rx) = omango::mpmc::bounded(0);
for _ in 0..nthreads {
let txc = tx.clone();
thread::spawn(move || {
for _ in 0..amt {
assert_eq!(txc.send(1), Ok(()));
}
});
}
for _ in 0..amt * nthreads {
assert_eq!(rx.recv(), Ok(1));
}
}
#[test]
fn mpmc_no_capacity() {
let amt = 10000;
let nthreads_send = num_cpus::get() - 1;
let nthreads_recv = num_cpus::get() - 1;
let (tx, rx) = omango::mpmc::bounded(0);
let mut receiving_threads = Vec::new();
let mut sending_threads = Vec::new();
for _ in 0..nthreads_send {
let txc = tx.clone();
let child = thread::spawn(move || {
for _ in 0..amt {
assert_eq!(txc.send(1), Ok(()));
}
});
sending_threads.push(child);
}
for _ in 0..nthreads_recv {
let rxc = rx.clone();
let thread = thread::spawn(move || {
for _ in 0..amt {
assert_eq!(rxc.recv(), Ok(1));
}
});
receiving_threads.push(thread);
}
for thread in sending_threads {
thread.join().expect("oops! the child thread panicked");
}
for thread in receiving_threads {
thread.join().expect("oops! the child thread panicked");
}
}