extern crate atom;
extern crate time;
use std::sync::atomic::{AtomicUsize};
use std::thread;
use std::mem;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use atom::*;
use time::precise_time_s;
pub use select::{Select, SelectMap};
pub use barrier::Barrier;
mod select;
mod barrier;
struct Inner {
state: AtomicUsize,
waiting: Atom<Box<Waiting>>
}
const PULSED: usize = 0x8000_0000;
const TX_DROP: usize = 0x4000_0000;
const TX_FLAGS: usize = PULSED | TX_DROP;
const REF_COUNT: usize = !TX_FLAGS;
struct Waiting {
next: Option<Box<Waiting>>,
wake: Wake
}
impl GetNextMut for Box<Waiting> {
type NextPtr = Option<Box<Waiting>>;
fn get_next(&mut self) -> &mut Option<Box<Waiting>> {
&mut self.next
}
}
enum Wake {
Thread(thread::Thread),
Select(select::Handle),
Barrier(barrier::Handle)
}
impl Waiting {
fn wake(s: Box<Self>, id: usize) {
let mut next = Some(s);
while let Some(s) = next {
let s = *s;
let Waiting { next: n, wake } = s;
next = n;
match wake {
Wake::Thread(thread) => thread.unpark(),
Wake::Select(select) => {
let trigger = {
let mut guard = select.0.lock().unwrap();
guard.ready.push(id);
guard.trigger.take()
};
trigger.map(|x| x.pulse());
}
Wake::Barrier(barrier) => {
let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed);
if count == 1 {
let mut guard = barrier.0.trigger.lock().unwrap();
if let Some(t) = guard.take() {
t.pulse();
}
}
}
}
}
}
fn id(&self) -> usize {
unsafe { mem::transmute(self) }
}
fn thread() -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Thread(thread::current())
})
}
fn select(handle: select::Handle) ->Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Select(handle)
})
}
fn barrier(handle: barrier::Handle) ->Box<Waiting> {
Box::new(Waiting{
next: None,
wake: Wake::Barrier(handle)
})
}
}
unsafe impl Send for Pulse {}
pub struct Pulse {
inner: *mut Inner
}
impl fmt::Debug for Pulse {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let id: usize = unsafe { mem::transmute(self.inner) };
write!(f, "Pulse({:?})", id)
}
}
fn delete_inner(state: usize, inner: *mut Inner) {
if state & REF_COUNT == 1 {
let inner: Box<Inner> = unsafe {
mem::transmute(inner)
};
drop(inner);
}
}
impl Drop for Pulse {
fn drop(&mut self) {
self.set(TX_DROP);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner)
}
}
impl Pulse {
pub unsafe fn cast_from_usize(ptr: usize) -> Pulse {
Pulse {
inner: mem::transmute(ptr)
}
}
pub unsafe fn cast_to_usize(self) -> usize {
let us = mem::transmute(self.inner);
mem::forget(self);
us
}
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
fn set(&self, state: usize) -> usize {
self.inner().state.fetch_or(state, Ordering::Relaxed)
}
fn wake(&self) {
let id = unsafe { mem::transmute(self.inner) };
match self.inner().waiting.take() {
None => (),
Some(v) => Waiting::wake(v, id)
}
}
pub fn pulse(self) {
self.set(PULSED);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner);
unsafe { mem::forget(self) }
}
}
unsafe impl Send for Signal {}
pub struct Signal {
inner: *mut Inner
}
impl fmt::Debug for Signal {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f, "Signal(id={:?}, pending={:?})", self.id(), self.is_pending())
}
}
impl Clone for Signal {
fn clone(&self) -> Signal {
self.inner().state.fetch_add(1, Ordering::Relaxed);
Signal { inner: self.inner }
}
}
impl Drop for Signal {
fn drop(&mut self) {
let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(flag, self.inner);
}
}
impl Signal {
pub fn new() -> (Signal, Pulse) {
let inner = Box::new(Inner {
state: AtomicUsize::new(2),
waiting: Atom::empty()
});
let inner = unsafe {mem::transmute(inner)};
(Signal {
inner: inner
},
Pulse {
inner: inner,
})
}
#[inline]
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
#[inline]
fn state(&self) -> usize {
self.inner().state.load(Ordering::Relaxed)
}
#[inline]
pub fn is_pending(&self) -> bool {
self.state() & TX_FLAGS == 0
}
fn in_use(&self) -> bool {
let state = self.state();
(state & REF_COUNT) != 1 || (state & TX_FLAGS) == 0
}
fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize {
let id = waiter.id();
self.inner().waiting.replace_and_set_next(waiter);
if !self.is_pending() {
if let Some(t) = self.inner().waiting.take() {
Waiting::wake(t, self.id());
}
}
id
}
fn remove_from_waitlist(&self, id: usize) {
let mut wl = self.inner().waiting.take();
while let Some(mut w) = wl {
let next = w.next.take();
if w.id() != id {
self.add_to_waitlist(w);
}
wl = next;
}
}
fn arm(self, waiter: Box<Waiting>) -> ArmedSignal {
let id = self.add_to_waitlist(waiter);
ArmedSignal {
id: id,
pulse: self
}
}
pub fn id(&self) -> usize {
unsafe { mem::transmute_copy(&self.inner) }
}
pub fn recycle(&self) -> Option<Pulse> {
if self.in_use() {
None
} else {
self.inner().state.store(2, Ordering::Relaxed);
Some(Pulse{
inner: self.inner,
})
}
}
fn wait_slow(&self) -> Result<(), WaitError> {
loop {
let id = self.add_to_waitlist(Waiting::thread());
if self.is_pending() {
thread::park();
}
self.remove_from_waitlist(id);
if !self.is_pending() {
let state = self.state();
return if (state & PULSED) == PULSED {
Ok(())
} else {
Err(WaitError::Dropped)
};
}
}
}
#[inline]
pub fn wait(&self) -> Result<(), WaitError> {
if !self.is_pending() {
let state = self.state();
return if (state & PULSED) == PULSED {
Ok(())
} else {
Err(WaitError::Dropped)
};
} else {
self.wait_slow()
}
}
pub fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> {
let mut now = (precise_time_s() * 1000.) as u64;
let end = now + ms as u64;
loop {
let id = self.add_to_waitlist(Waiting::thread());
if self.is_pending() {
now = (precise_time_s() * 1000.) as u64;
if now > end {
return Err(TimeoutError::Timeout)
}
thread::park_timeout_ms((end - now) as u32);
}
self.remove_from_waitlist(id);
if !self.is_pending() {
let state = self.state();
return if (state & PULSED) == PULSED {
Ok(())
} else {
Err(TimeoutError::Error(WaitError::Dropped))
};
}
}
}
}
impl IntoRawPtr for Pulse {
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
mem::forget(self);
mem::transmute(inner)
}
}
impl FromRawPtr for Pulse {
unsafe fn from_raw(ptr: *mut ()) -> Pulse {
Pulse { inner: mem::transmute(ptr) }
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum WaitError {
Dropped
}
#[derive(Debug, PartialEq, Eq)]
pub enum TimeoutError {
Error(WaitError),
Timeout
}
struct ArmedSignal {
pulse: Signal,
id: usize
}
impl Deref for ArmedSignal {
type Target = Signal;
fn deref(&self) -> &Signal { &self.pulse }
}
impl ArmedSignal {
fn disarm(self) -> Signal {
self.remove_from_waitlist(self.id);
self.pulse
}
}
pub trait Signals {
fn signal(&self) -> Signal;
fn wait(&self) -> Result<(), WaitError> {
let signal = self.signal();
signal.wait()
}
fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> {
let signal = self.signal();
signal.wait_timeout_ms(ms)
}
}