#[macro_export]
macro_rules! broadcast {
(
$bus:expr,
$recv:expr,
$bucket:expr,
$nbuckets:expr,
$nthreads:expr,
$source:expr,
$kill:expr,
$threads_alive:expr,
$wait:expr
) => {{
let mut bucket_;
let buffer = match $bucket {
Some(bucket) => bucket,
None => {
bucket_ = vec![0u8; BUCKET_SIZE];
&mut bucket_
}
};
let mut wait_time = $wait;
'outer: loop {
if $kill() {
return Ok(());
}
while $recv.load(Ordering::SeqCst) < $nbuckets * $nthreads {
if $kill() {
return Ok(());
}
match $source.read(buffer)? {
0 => break 'outer,
read => {
let share = Arc::new(buffer[..read].to_owned().into_boxed_slice());
$bus.broadcast(share.clone());
}
}
thread::sleep(wait_time);
}
$recv.store(0, Ordering::SeqCst);
}
drop($bus);
wait_time = Duration::from_millis(1);
while Arc::strong_count($threads_alive) > 1 {
thread::sleep(wait_time);
}
Ok(())
}}
}