extern crate atom;
extern crate time;
use std::sync::atomic::AtomicUsize;
use std::thread;
use std::mem;
use std::fmt;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::cell::RefCell;
use atom::*;
use time::precise_time_s;
use fnbox::FnBox;
pub use select::{Select, SelectMap};
pub use barrier::Barrier;
mod select;
mod barrier;
mod fnbox;
struct Inner {
state: AtomicUsize,
waiting: Atom<Box<Waiting>>,
}
const PULSED: usize = 0x8000_0000;
const TX_DROP: usize = 0x4000_0000;
const TX_FLAGS: usize = PULSED | TX_DROP;
const REF_COUNT: usize = !TX_FLAGS;
struct Waiting {
next: Option<Box<Waiting>>,
wake: Wake,
}
impl GetNextMut for Box<Waiting> {
type NextPtr = Option<Box<Waiting>>;
fn get_next(&mut self) -> &mut Option<Box<Waiting>> {
&mut self.next
}
}
enum Wake {
Thread(thread::Thread),
Select(select::Handle),
Barrier(barrier::Handle),
Callback(Box<FnBox>),
}
impl Waiting {
fn wake(s: Box<Self>, id: usize) {
let mut next = Some(s);
while let Some(s) = next {
let s = *s;
let Waiting { next: n, wake } = s;
next = n;
match wake {
Wake::Thread(thread) => thread.unpark(),
Wake::Select(select) => {
let trigger = {
let mut guard = select.0.lock().unwrap();
guard.ready.push(id);
guard.trigger.take()
};
trigger.map(|x| x.pulse());
}
Wake::Barrier(barrier) => {
let count = barrier.0.count.fetch_sub(1, Ordering::Relaxed);
if count == 1 {
let mut guard = barrier.0.trigger.lock().unwrap();
if let Some(t) = guard.take() {
t.pulse();
}
}
}
Wake::Callback(cb) => cb.call_box(),
}
}
}
fn id(&self) -> usize {
unsafe { mem::transmute(self) }
}
fn thread() -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Thread(thread::current()),
})
}
fn select(handle: select::Handle) -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Select(handle),
})
}
fn barrier(handle: barrier::Handle) -> Box<Waiting> {
Box::new(Waiting {
next: None,
wake: Wake::Barrier(handle),
})
}
fn callback<F>(cb: F) -> Box<Waiting>
where F: FnOnce() + 'static
{
Box::new(Waiting {
next: None,
wake: Wake::Callback(Box::new(cb)),
})
}
}
unsafe impl Send for Pulse {}
unsafe impl Sync for Pulse {}
pub struct Pulse {
inner: *mut Inner,
}
impl fmt::Debug for Pulse {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let id: usize = unsafe { mem::transmute(self.inner) };
write!(f, "Pulse({:?})", id)
}
}
fn delete_inner(state: usize, inner: *mut Inner) {
if state & REF_COUNT == 1 {
let inner: Box<Inner> = unsafe { mem::transmute(inner) };
drop(inner);
}
}
impl Drop for Pulse {
fn drop(&mut self) {
self.set(TX_DROP);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner)
}
}
impl Pulse {
#[inline]
pub unsafe fn cast_from_usize(ptr: usize) -> Pulse {
Pulse { inner: mem::transmute(ptr) }
}
#[inline]
pub unsafe fn cast_to_usize(self) -> usize {
let us = mem::transmute(self.inner);
mem::forget(self);
us
}
#[inline]
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
#[inline]
fn set(&self, state: usize) -> usize {
self.inner().state.fetch_or(state, Ordering::Relaxed)
}
#[inline]
fn wake(&self) {
let id = unsafe { mem::transmute(self.inner) };
match self.inner().waiting.take() {
None => (),
Some(v) => Waiting::wake(v, id),
}
}
#[inline]
pub fn pulse(self) {
self.set(PULSED);
self.wake();
let state = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(state, self.inner);
mem::forget(self)
}
}
unsafe impl Send for Signal {}
unsafe impl Sync for Signal {}
pub struct Signal {
inner: *mut Inner,
}
impl fmt::Debug for Signal {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
write!(f,
"Signal(id={:?}, pending={:?})",
self.id(),
self.is_pending())
}
}
impl Clone for Signal {
#[inline(always)]
fn clone(&self) -> Signal {
self.inner().state.fetch_add(1, Ordering::Relaxed);
Signal { inner: self.inner }
}
}
impl Drop for Signal {
#[inline]
fn drop(&mut self) {
let flag = self.inner().state.fetch_sub(1, Ordering::Relaxed);
delete_inner(flag, self.inner);
}
}
impl Signal {
pub fn new() -> (Signal, Pulse) {
let inner = Box::new(Inner {
state: AtomicUsize::new(2),
waiting: Atom::empty(),
});
let inner = unsafe { mem::transmute(inner) };
(Signal { inner: inner }, Pulse { inner: inner })
}
pub fn pulsed() -> Signal {
let inner = Box::new(Inner {
state: AtomicUsize::new(1 | PULSED),
waiting: Atom::empty(),
});
let inner = unsafe { mem::transmute(inner) };
Signal { inner: inner }
}
#[inline]
fn inner(&self) -> &Inner {
unsafe { mem::transmute(self.inner) }
}
#[inline]
pub fn state(&self) -> SignalState {
let flags = self.inner().state.load(Ordering::Relaxed);
match (flags & TX_DROP == TX_DROP, flags & PULSED == PULSED) {
(_, true) => SignalState::Pulsed,
(true, _) => SignalState::Dropped,
(_, _) => SignalState::Pending,
}
}
#[inline]
pub fn is_pending(&self) -> bool {
self.state() == SignalState::Pending
}
fn add_to_waitlist(&self, waiter: Box<Waiting>) -> usize {
let id = waiter.id();
if !self.is_pending() {
Waiting::wake(waiter, self.id());
return id;
}
self.inner().waiting.replace_and_set_next(waiter);
if !self.is_pending() {
if let Some(t) = self.inner().waiting.take() {
Waiting::wake(t, self.id());
}
}
id
}
fn remove_from_waitlist(&self, id: usize) {
let mut wl = self.inner().waiting.take();
while let Some(mut w) = wl {
let next = w.next.take();
if w.id() != id {
self.add_to_waitlist(w);
}
wl = next;
}
}
fn arm(self, waiter: Box<Waiting>) -> ArmedSignal {
let id = self.add_to_waitlist(waiter);
ArmedSignal {
id: id,
pulse: self,
}
}
pub fn id(&self) -> usize {
unsafe { mem::transmute_copy(&self.inner) }
}
#[inline]
pub fn wait(self) -> Result<(), WaitError> {
match self.state() {
SignalState::Pulsed => Ok(()),
SignalState::Dropped => Err(WaitError::Dropped),
SignalState::Pending => {
let s = take_scheduler().expect("no scheduler found");
let res = s.wait(self);
swap_scheduler(s);
res
}
}
}
pub fn wait_timeout_ms(self, ms: u32) -> Result<(), TimeoutError> {
SCHED.with(|sched| {
let s = sched.borrow_mut().take().expect("Waited while: no scheduler installed");
let res = s.wait_timeout_ms(self, ms);
*sched.borrow_mut() = Some(s);
res
})
}
pub fn callback<F>(self, cb: F)
where F: FnOnce() + 'static
{
self.add_to_waitlist(Waiting::callback(cb));
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum SignalState {
Pending,
Pulsed,
Dropped,
}
impl IntoRawPtr for Pulse {
#[inline(always)]
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
mem::forget(self);
mem::transmute(inner)
}
}
impl FromRawPtr for Pulse {
#[inline(always)]
unsafe fn from_raw(ptr: *mut ()) -> Pulse {
Pulse { inner: mem::transmute(ptr) }
}
}
impl IntoRawPtr for Signal {
#[inline(always)]
unsafe fn into_raw(self) -> *mut () {
let inner = self.inner;
mem::forget(self);
mem::transmute(inner)
}
}
impl FromRawPtr for Signal {
#[inline(always)]
unsafe fn from_raw(ptr: *mut ()) -> Signal {
Signal { inner: mem::transmute(ptr) }
}
}
#[derive(Debug, PartialEq, Eq)]
pub enum WaitError {
Dropped,
}
#[derive(Debug, PartialEq, Eq)]
pub enum TimeoutError {
Error(WaitError),
Timeout,
}
struct ArmedSignal {
pulse: Signal,
id: usize,
}
impl Deref for ArmedSignal {
type Target = Signal;
fn deref(&self) -> &Signal {
&self.pulse
}
}
impl ArmedSignal {
fn disarm(self) -> Signal {
self.remove_from_waitlist(self.id);
self.pulse
}
}
pub trait Signals {
fn signal(&self) -> Signal;
fn wait(&self) -> Result<(), WaitError> {
let signal = self.signal();
signal.wait()
}
fn wait_timeout_ms(&self, ms: u32) -> Result<(), TimeoutError> {
let signal = self.signal();
signal.wait_timeout_ms(ms)
}
}
pub trait Scheduler: std::fmt::Debug {
fn wait(&self, signal: Signal) -> Result<(), WaitError>;
fn wait_timeout_ms(&self, signal: Signal, timeout: u32) -> Result<(), TimeoutError>;
}
#[derive(Debug)]
pub struct ThreadScheduler;
impl Scheduler for ThreadScheduler {
fn wait(&self, signal: Signal) -> Result<(), WaitError> {
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
thread::park();
}
signal.remove_from_waitlist(id);
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(WaitError::Dropped),
}
}
}
fn wait_timeout_ms(&self, signal: Signal, ms: u32) -> Result<(), TimeoutError> {
let mut now = (precise_time_s() * 1000.) as u64;
let end = now + ms as u64;
loop {
let id = signal.add_to_waitlist(Waiting::thread());
if signal.is_pending() {
now = (precise_time_s() * 1000.) as u64;
if now > end {
return Err(TimeoutError::Timeout);
}
thread::park_timeout_ms((end - now) as u32);
}
signal.remove_from_waitlist(id);
match signal.state() {
SignalState::Pending => (),
SignalState::Pulsed => return Ok(()),
SignalState::Dropped => return Err(TimeoutError::Error(WaitError::Dropped)),
}
}
}
}
thread_local!(static SCHED: RefCell<Option<Box<Scheduler>>> = RefCell::new(Some(Box::new(ThreadScheduler))));
#[inline(never)]
fn take_scheduler() -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = None;
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
sched
}
#[inline(never)]
pub fn swap_scheduler(sched: Box<Scheduler>) -> Option<Box<Scheduler>> {
use std::mem;
let mut sched = Some(sched);
SCHED.with(|s| mem::swap(&mut *s.borrow_mut(), &mut sched));
sched
}
pub fn with_scheduler<F>(f: F, sched: Box<Scheduler>) -> Option<Box<Scheduler>>
where F: FnOnce()
{
let old = swap_scheduler(sched);
f();
old.and_then(|o| swap_scheduler(o))
}