#![deny(missing_docs)]
#![cfg_attr(feature = "bench", feature(test))]
extern crate atomic_option;
use atomic_option::AtomicOption;
extern crate parking_lot_core;
use parking_lot_core::SpinWait;
#[cfg(feature = "bench")]
extern crate test;
use std::sync::atomic;
use std::sync::mpsc;
use std::thread;
use std::cell::UnsafeCell;
use std::ops::Deref;
use std::sync::Arc;
struct SeatState<T: Clone> {
max: usize,
val: Option<T>,
}
struct MutSeatState<T: Clone>(UnsafeCell<SeatState<T>>);
unsafe impl<T: Clone> Sync for MutSeatState<T> {}
impl<T: Clone> Deref for MutSeatState<T> {
type Target = UnsafeCell<SeatState<T>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
struct Seat<T: Clone> {
read: atomic::AtomicUsize,
state: MutSeatState<T>,
waiting: AtomicOption<thread::Thread>,
}
impl<T: Clone> Seat<T> {
fn take(&self) -> T {
let read = self.read.load(atomic::Ordering::Acquire);
let state = unsafe { &*self.state.get() };
assert!(read < state.max,
"reader hit seat with exhausted reader count");
let mut waiting = None;
let v = if read + 1 == state.max {
waiting = self.waiting.take(atomic::Ordering::Relaxed);
unsafe { &mut *self.state.get() }.val.take().unwrap()
} else {
state.val.clone().expect("seat that should be occupied was empty")
};
drop(state);
self.read.fetch_add(1, atomic::Ordering::AcqRel);
if let Some(t) = waiting {
t.unpark();
}
return v;
}
}
impl<T: Clone> Default for Seat<T> {
fn default() -> Self {
Seat {
read: atomic::AtomicUsize::new(0),
waiting: AtomicOption::empty(),
state: MutSeatState(UnsafeCell::new(SeatState {
max: 0,
val: None,
})),
}
}
}
struct BusInner<T: Clone> {
ring: Vec<Seat<T>>,
len: usize,
tail: atomic::AtomicUsize,
closed: atomic::AtomicBool,
}
pub struct Bus<T: Clone> {
state: Arc<BusInner<T>>,
readers: usize,
rleft: Vec<usize>,
leaving: (mpsc::Sender<usize>, mpsc::Receiver<usize>),
waiting: (mpsc::Sender<(thread::Thread, usize)>, mpsc::Receiver<(thread::Thread, usize)>),
unpark: mpsc::Sender<thread::Thread>,
cache: Vec<(thread::Thread, usize)>,
}
impl<T: Clone> Bus<T> {
pub fn new(mut len: usize) -> Bus<T> {
use std::iter;
len += 1;
let inner = Arc::new(BusInner {
ring: (0..len).map(|_| Seat::default()).collect(),
tail: atomic::AtomicUsize::new(0),
closed: atomic::AtomicBool::new(false),
len: len,
});
let (unpark_tx, unpark_rx) = mpsc::channel::<thread::Thread>();
thread::spawn(move || {
for t in unpark_rx.iter() {
t.unpark();
}
});
Bus {
state: inner,
readers: 0,
rleft: iter::repeat(0).take(len).collect(),
leaving: mpsc::channel(),
waiting: mpsc::channel(),
unpark: unpark_tx,
cache: Vec::new(),
}
}
#[inline]
fn expected(&mut self, at: usize) -> usize {
unsafe { &*self.state.ring[at].state.get() }.max - self.rleft[at]
}
fn broadcast_inner(&mut self, val: T, block: bool) -> Result<(), T> {
let tail = self.state.tail.load(atomic::Ordering::Relaxed);
let fence = (tail + 1) % self.state.len;
let mut sw = SpinWait::new();
loop {
let fence_read = self.state.ring[fence].read.load(atomic::Ordering::Acquire);
if fence_read == self.expected(fence) {
break;
}
while let Ok(mut left) = self.leaving.1.try_recv() {
self.readers -= 1;
while left != tail {
self.rleft[left] += 1;
left = (left + 1) % self.state.len
}
}
if fence_read == self.expected(fence) {
break;
} else if block {
use std::time::Duration;
self.state.ring[fence]
.waiting
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed);
self.state.ring[fence].read.fetch_add(0, atomic::Ordering::Release);
if !sw.spin() {
thread::park_timeout(Duration::new(0, 100000));
}
continue;
} else {
return Err(val);
}
}
let readers = self.readers;
{
let next = &self.state.ring[tail];
let state = unsafe { &mut *next.state.get() };
state.max = readers;
state.val = Some(val);
next.waiting.replace(None, atomic::Ordering::Relaxed);
next.read.store(0, atomic::Ordering::Release);
}
self.rleft[tail] = 0;
let tail = (tail + 1) % self.state.len;
self.state.tail.store(tail, atomic::Ordering::Release);
while let Ok((t, at)) = self.waiting.1.try_recv() {
if at == tail {
self.cache.push((t, at))
} else {
self.unpark.send(t).unwrap();
}
}
for w in self.cache.drain(..) {
self.waiting.0.send(w).unwrap();
}
Ok(())
}
pub fn try_broadcast(&mut self, val: T) -> Result<(), T> {
self.broadcast_inner(val, false)
}
pub fn broadcast(&mut self, val: T) {
if let Err(..) = self.broadcast_inner(val, true) {
unreachable!("blocking broadcast_inner can't fail");
}
}
pub fn add_rx(&mut self) -> BusReader<T> {
self.readers += 1;
BusReader {
bus: self.state.clone(),
head: self.state.tail.load(atomic::Ordering::Relaxed),
leaving: self.leaving.0.clone(),
waiting: self.waiting.0.clone(),
closed: false,
}
}
}
impl<T: Clone> Drop for Bus<T> {
fn drop(&mut self) {
self.state.closed.store(true, atomic::Ordering::Relaxed);
self.state.tail.fetch_add(0, atomic::Ordering::AcqRel);
}
}
pub struct BusReader<T: Clone> {
bus: Arc<BusInner<T>>,
head: usize,
leaving: mpsc::Sender<usize>,
waiting: mpsc::Sender<(thread::Thread, usize)>,
closed: bool,
}
impl<T: Clone> BusReader<T> {
fn recv_inner(&mut self, block: bool) -> Result<T, mpsc::TryRecvError> {
if self.closed {
return Err(mpsc::TryRecvError::Disconnected);
}
let mut was_closed = false;
let mut sw = SpinWait::new();
loop {
use std::time::Duration;
let tail = self.bus.tail.load(atomic::Ordering::Acquire);
if tail != self.head {
break;
}
if self.bus.closed.load(atomic::Ordering::Relaxed) {
if !was_closed {
was_closed = true;
continue;
}
self.closed = true;
return Err(mpsc::TryRecvError::Disconnected);
}
if !block {
return Err(mpsc::TryRecvError::Empty);
}
if let Err(..) = self.waiting.send((thread::current(), self.head)) {
unimplemented!();
}
if !sw.spin() {
thread::park_timeout(Duration::new(0, 100000));
}
}
let head = self.head;
let ret = self.bus.ring[head].take();
self.head = (head + 1) % self.bus.len;
Ok(ret)
}
pub fn try_recv(&mut self) -> Result<T, mpsc::TryRecvError> {
self.recv_inner(false)
}
pub fn recv(&mut self) -> Result<T, mpsc::RecvError> {
match self.recv_inner(true) {
Ok(val) => Ok(val),
Err(mpsc::TryRecvError::Disconnected) => Err(mpsc::RecvError),
_ => unreachable!("blocking recv_inner can't fail"),
}
}
pub fn iter<'a>(&'a mut self) -> BusIter<'a, T> {
BusIter(self)
}
}
impl<T: Clone> Drop for BusReader<T> {
#[allow(unused_must_use)]
fn drop(&mut self) {
self.leaving.send(self.head);
}
}
pub struct BusIter<'a, T: 'a + Clone>(&'a mut BusReader<T>);
pub struct BusIntoIter<T: Clone>(BusReader<T>);
impl<'a, T: Clone> IntoIterator for &'a mut BusReader<T> {
type Item = T;
type IntoIter = BusIter<'a, T>;
fn into_iter(self) -> BusIter<'a, T> {
BusIter(self)
}
}
impl<T: Clone> IntoIterator for BusReader<T> {
type Item = T;
type IntoIter = BusIntoIter<T>;
fn into_iter(self) -> BusIntoIter<T> {
BusIntoIter(self)
}
}
impl<'a, T: Clone> Iterator for BusIter<'a, T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.0.recv().ok()
}
}
impl<T: Clone> Iterator for BusIntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<T> {
self.0.recv().ok()
}
}
#[cfg(feature = "bench")]
#[bench]
fn bench_bus_one_to_one(b: &mut test::Bencher) {
let mut c = Bus::new(100);
let mut rx = c.add_rx();
let j = thread::spawn(move || {
loop {
match rx.recv() {
Ok(exit) if exit => break,
Err(..) => break,
_ => (),
}
}
});
b.iter(|| c.broadcast(false));
c.broadcast(true);
j.join().unwrap();
}
#[cfg(feature = "bench")]
#[bench]
fn bench_syncch_one_to_one(b: &mut test::Bencher) {
let (tx, rx) = mpsc::sync_channel(100);
let j = thread::spawn(move || {
loop {
match rx.recv() {
Ok(exit) if exit => break,
Err(..) => break,
_ => (),
}
}
});
b.iter(|| tx.send(false).unwrap());
tx.send(true).unwrap();
j.join().unwrap();
}