#![feature(core)]
extern crate atom;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::mem;
use std::ops::Deref;
use atom::{Atom, GetNextMut};
use std::boxed::FnBox;
pub use select::Select;
pub use barrier::Barrier;
mod select;
mod barrier;
struct Inner {
state: AtomicUsize,
waiting: Atom<Waiting, Box<Waiting>>
}
const PULSED: usize = 0x8000_0000;
const TX_DROP: usize = 0x4000_0000;
const TX_FLAGS: usize = PULSED | TX_DROP;
const REF_COUNT: usize = !TX_FLAGS;
struct Waiting {
next: Option<Box<Waiting>>,
wake: Wake
}
impl GetNextMut for Box<Waiting> {
type NextPtr = Option<Box<Waiting>>;
fn get_next(&mut self) -> &mut Option<Box<Waiting>> {
&mut self.next
}
}
enum Wake {
Thread(thread::Thread),
Select(select::Handle),
Barrier(barrier::Handle),
Callback(Box<FnBox() + Send>)
}
impl Waiting {
fn wake(s: Box<Self>, id: usize) {
let mut next = Some(s);
while let Some(s) = next {
let s = *s;
let Waiting { next: n, wake } = s;
next = n;
match wake {
Wake::Thread(thread) => thread.unpark(),
Wake::Select(select) => {
let trigger = {
let mut guard = select.0.lock().unwrap();
guard.ready.push(id);
guard.trigger.take()
};
trigger.map(|x| x.trigger());
}
Wake::Barrier(barrier) => {
let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed);
if count == 1 {
let mut guard = barrier.0.trigger.lock().unwrap();
if let Some(t) = guard.take() {
t.trigger();
}
}
}
Wake::Callback(cb) => cb()
}
}
}
fn id(&self) -> usize {
unsafe { mem::transmute(self) }
}
fn thread() -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Thread(thread::current())
})
}
fn select(handle: select::Handle) ->Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Select(handle)
})
}
fn barrier(handle: barrier::Handle) ->Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Barrier(handle)
})
}
fn callback(f: Box<FnBox() + Send>) ->Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Callback(f)
})
}
}
unsafe impl Send for Trigger {}
pub struct Trigger {
inner: *mut Inner,
pulsed: bool
}
fn delete_inner(state: usize, inner: *mut Inner) {
if state & REF_COUNT == 1 {
let inner: Box<Inner> = unsafe {
mem::transmute(inner)
};
drop(inner);
}
}
impl Drop for Trigger {
fn drop(&mut self) {
if !self.pulsed {
self.set(TX_DROP);
self.wake();
}
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner)
}
}
impl Trigger {
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
fn set(&self, state: usize) -> usize {
self.inner().state.fetch_or(state, Ordering::Relaxed)
}
fn wake(&self) {
let id = unsafe { mem::transmute(self.inner) };
match self.inner().waiting.take(Ordering::Acquire) {
None => (),
Some(v) => Waiting::wake(v, id)
}
}
pub fn trigger(mut self) {
self.pulsed = true;
self.set(PULSED);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner);
unsafe { mem::forget(self) }
}
}
unsafe impl Send for Pulse {}
pub struct Pulse {
inner: *mut Inner
}
impl Clone for Pulse {
fn clone(&self) -> Pulse {
self.inner().state.fetch_add(1, Ordering::Relaxed);
Pulse { inner: self.inner }
}
}
impl Drop for Pulse {
fn drop(&mut self) {
let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(flag, self.inner);
}
}
impl Pulse {
pub fn new() -> (Pulse, Trigger) {
let inner = Box::new(Inner {
state: AtomicUsize::new(2),
waiting: Atom::empty()
});
let inner = unsafe {mem::transmute(inner)};
(Pulse {
inner: inner
},
Trigger {
inner: inner,
pulsed: false
})
}
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
fn state(&self) -> usize {
self.inner().state.load(Ordering::Relaxed)
}
pub fn is_pending(&self) -> bool {
self.state() & TX_FLAGS == 0
}
fn in_use(&self) -> bool {
let state = self.state();
(state & REF_COUNT) != 1 || (state & TX_FLAGS) == 0
}
fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize {
let id = waiter.id();
self.inner().waiting.replace_and_set_next(
waiter,
Ordering::AcqRel
);
if !self.is_pending() {
if let Some(t) = self.inner().waiting.take(Ordering::Acquire) {
Waiting::wake(t, self.id());
}
}
id
}
fn remove_from_waitlist(&self, id: usize) {
let mut wl = self.inner().waiting.take(Ordering::Acquire);
while let Some(mut w) = wl {
let next = w.next.take();
if w.id() != id {
self.add_to_waitlist(w);
}
wl = next;
}
}
fn arm(self, waiter: Box<Waiting>) -> ArmedPulse {
let id = self.add_to_waitlist(waiter);
ArmedPulse {
id: id,
pulse: self
}
}
fn arm_ref<'a>(&'a self, waiter: Box<Waiting>) -> ArmedPulseRef<'a> {
let id = self.add_to_waitlist(waiter);
ArmedPulseRef {
pulse: self,
id: id
}
}
pub fn wait(&self) -> Result<(), WaitError> {
loop {
if !self.is_pending() {
let state = self.state();
return if (state & PULSED) == PULSED {
Ok(())
} else {
Err(WaitError(state))
};
}
let p = self.arm_ref(Waiting::thread());
if self.is_pending() {
thread::park();
}
drop(p);
}
}
pub fn id(&self) -> usize {
unsafe { mem::transmute_copy(&self.inner) }
}
pub fn recycle(&self) -> Option<Trigger> {
if self.in_use() {
None
} else {
self.inner().state.store(2, Ordering::Relaxed);
Some(Trigger{
inner: self.inner,
pulsed: false,
})
}
}
pub fn on_complete<F>(self, f: F) where F: FnOnce() + Send + 'static {
self.arm(Waiting::callback(Box::new(f)));
}
}
#[derive(Debug)]
pub struct WaitError(usize);
struct ArmedPulse {
pulse: Pulse,
id: usize
}
impl Deref for ArmedPulse {
type Target = Pulse;
fn deref(&self) -> &Pulse { &self.pulse }
}
impl ArmedPulse {
fn disarm(self) -> Pulse {
self.remove_from_waitlist(self.id);
self.pulse
}
}
struct ArmedPulseRef<'a> {
pulse: &'a Pulse,
id: usize
}
impl<'a> Deref for ArmedPulseRef<'a> {
type Target = Pulse;
fn deref(&self) -> &Pulse { &self.pulse }
}
impl<'a> Drop for ArmedPulseRef<'a> {
fn drop(&mut self) {
self.remove_from_waitlist(self.id);
}
}