#![feature(test)]
extern crate crossbeam;
extern crate test;
use core::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering::Relaxed;
use hyperbridge::channel;
use std::sync::Arc;
use std::thread;
const VALUES: usize = 10000;
const THREADS: usize = 16;
#[test]
fn hyperbridge_close() {
let (sender, receiver) = channel::new();
let mut counter = 0;
let mut handles = vec![];
for i in 0..THREADS {
let ch = sender.clone();
let jh = thread::spawn(move || {
for _ in 0..VALUES {
ch.send(i).unwrap();
}
});
handles.push(jh);
}
for jh in handles.drain(..) {
let _ = jh.join();
}
sender.close();
let mut iters = THREADS * VALUES;
while iters > 0 {
match receiver.try_recv() {
Ok(Some(v)) => {
counter += v as usize;
iters -= 1;
}
_ => {}
}
}
let total = (0..THREADS).map(|i| i * VALUES).sum();
assert_eq!(counter, total);
}
#[test]
fn hyperbridge_mpsc() {
let (sender, receiver) = channel::new();
let mut counter = 0;
let mut handles = vec![];
for i in 0..THREADS {
let ch = sender.clone();
let jh = thread::spawn(move || {
for _ in 0..VALUES {
ch.send(i).unwrap();
}
});
handles.push(jh);
}
let mut iters = THREADS * VALUES;
while iters > 0 {
match receiver.try_recv() {
Ok(Some(v)) => {
counter += v as usize;
iters -= 1;
}
_ => {}
}
}
let total = (0..THREADS).map(|i| i * VALUES).sum();
for jh in handles.drain(..) {
let _ = jh.join();
}
assert_eq!(counter, total);
}
#[test]
fn hyperbridge_mpmc() {
let (sender, receiver) = channel::new();
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for i in 0..THREADS {
let ch = sender.clone();
let jh = thread::spawn(move || {
for _ in 0..VALUES {
ch.send(i).unwrap();
}
});
handles.push(jh);
}
for _ in 0..THREADS {
let ch = receiver.clone();
let local_counter = counter.clone();
let jh = thread::spawn(move || {
let mut iters = VALUES;
while iters > 0 {
if let Ok(Some(v)) = ch.try_recv() {
local_counter.fetch_add(v as usize, Relaxed);
iters -= 1;
}
}
});
handles.push(jh);
}
for jh in handles.drain(..) {
let _ = jh.join();
}
let total = (0..THREADS).map(|i| i * VALUES).sum();
assert_eq!(counter.load(Relaxed), total);
}
#[test]
fn hyperbridge_drop_senders() {
let (sender, receiver) = channel::new();
let mut counter = 0;
let mut handles = vec![];
for i in 0..THREADS {
let ch = sender.clone();
let jh = thread::spawn(move || {
for _ in 0..VALUES {
ch.send(i).unwrap();
}
});
handles.push(jh);
}
for jh in handles.drain(..) {
let _ = jh.join();
}
let mut iters = THREADS * VALUES;
while iters > 0 {
match receiver.try_recv() {
Ok(Some(v)) => {
counter += v as usize;
iters -= 1;
}
_ => {}
}
}
let total = (0..THREADS).map(|i| i * VALUES).sum();
assert_eq!(counter, total);
drop(sender);
match receiver.try_recv() {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::BrokenPipe),
_ => panic!("expected closed channel"),
}
}
#[test]
fn hyperbridge_drop_receiver() {
let (sender, receiver) = channel::new();
let mut counter = 0;
let mut handles = vec![];
for i in 0..THREADS {
let ch = sender.clone();
let jh = thread::spawn(move || {
for _ in 0..VALUES {
ch.send(i).unwrap();
}
});
handles.push(jh);
}
for jh in handles.drain(..) {
let _ = jh.join();
}
let mut iters = THREADS * VALUES;
while iters > 0 {
match receiver.try_recv() {
Ok(Some(v)) => {
counter += v as usize;
iters -= 1;
}
_ => {}
}
}
let total = (0..THREADS).map(|i| i * VALUES).sum();
assert_eq!(counter, total);
drop(receiver);
match sender.send(0) {
Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::BrokenPipe),
_ => panic!("expected closed channel"),
}
}