#![allow(unused_assignments)]
use super::{Async, Future, Complete, Cancel, AsyncError};
use syncbox::atomic::{self, AtomicU64, Ordering};
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::{fmt, u32};
pub fn select<S: Select<E>, E: Send + 'static>(asyncs: S) -> Future<(u32, S), E> {
let (complete, res) = Future::pair();
complete.receive(move |res| {
if let Ok(complete) = res {
asyncs.select(complete);
}
});
res
}
pub trait Select<E: Send + 'static> : Sized + Send + 'static {
fn select(self, complete: Complete<(u32, Self), E>);
}
trait Values<S: Select<E>, E: Send + 'static> {
type Tokens: Send + 'static;
fn consume(&mut self) -> S;
fn cancel_callbacks(&mut self, selected: u32, up_to: u32, tokens: &mut Self::Tokens) -> u32;
}
struct Selection<V: Values<S, E>, S: Select<E>, E: Send + 'static> {
core: Arc<UnsafeCell<Core<V, S, E>>>,
}
unsafe impl<V: Values<S, E>, S: Select<E>, E: Send + 'static> Sync for Selection<V, S, E> {}
unsafe impl<V: Values<S, E>, S: Select<E>, E: Send + 'static> Send for Selection<V, S, E> {}
impl<V: Values<S, E>, S: Select<E>, E: Send + 'static> Selection<V, S, E> {
fn new(vals: V,
tokens: V::Tokens,
remaining: u32,
complete: Complete<(u32, S), E>) -> Selection<V, S, E> {
let core = Arc::new(UnsafeCell::new(Core {
vals: vals,
tokens: tokens,
complete: Some(complete),
state: AtomicState::new(remaining),
}));
Selection { core: core }
}
fn async_ready<A: Async<Error=E>>(&self, async: A, index: u32, slot: &mut Option<A>) {
let mut handled = 1;
debug!("selection async ready; index={}; is_err={}", index, async.is_err());
let (win, prev, curr) = self.core().state.try_win(index, async.is_err());
if win {
handled += self.core_mut().vals.cancel_callbacks(
index, prev.callbacks_registered(), &mut self.core_mut().tokens);
debug!("async val won select; handled={}", handled);
if async.is_err() {
debug!("first realized async val is error");
let complete = self.core_mut().complete.take()
.expect("result future previously completed");
match async.expect() {
Err(AsyncError::Failed(e)) => {
debug!("execution error");
complete.fail(e);
}
Err(AsyncError::Aborted) => {
debug!("future aborted");
drop(complete);
}
_ => {
debug!("WARN async should be Err, but it is Ok");
}
}
return;
}
}
if curr.is_err() {
return;
}
*slot = Some(async);
self.dec_remaining(handled, curr);
}
fn track_callback<A: Async>(&self,
cancel: A::Cancel,
aref: &mut Option<A>,
cref: &mut Option<A::Cancel>) -> bool {
*cref = Some(cancel);
let (success, curr) = self.core().state.inc_callbacks_registered();
if !success {
let cancel = cref.take().expect("cancel token not present");
if let Some(async) = cancel.cancel() {
*aref = Some(async);
self.dec_remaining(1, curr);
return false;
}
}
return true;
}
fn dec_remaining(&self, count: u32, mut curr: State) {
if count == 0 {
debug!("dec_remaining -- nothing to do");
return;
}
curr = self.core().state.dec_remaining(count, curr);
debug!("dec_remaining -- performed dec; state={:?}", curr);
if curr.remaining() == 0 {
self.complete(curr);
}
}
fn complete(&self, curr: State) {
atomic::fence(Ordering::Acquire);
let core = self.core_mut();
let complete = core.complete.take().expect("result future previously completed");
complete.complete((curr.selected(), core.vals.consume()));
}
fn core(&self) -> &Core<V, S, E> {
unsafe { &*self.core.get() }
}
fn core_mut(&self) -> &mut Core<V, S, E> {
unsafe { &mut *self.core.get() }
}
}
impl<V: Values<S, E>, S: Select<E>, E: Send + 'static> Selection<V, S, E> {
fn clone(&self) -> Selection<V, S, E> {
Selection { core: self.core.clone() }
}
}
struct Core<V: Values<S, E>, S: Select<E>, E: Send + 'static> {
vals: V,
tokens: V::Tokens,
complete: Option<Complete<(u32, S), E>>,
state: AtomicState,
}
macro_rules! expr {
($e: expr) => { $e };
}
macro_rules! components {
($selection:ident, $pending:ident, $handled:ident, ($async:ident, $id:tt)) => {{
if $pending {
let s = $selection.clone();
let c = $async.ready(move |a| {
s.async_ready(a, expr!($id), expr!(&mut s.core_mut().vals.$id))
});
let core = $selection.core_mut();
$pending = $selection.track_callback(
c, expr!(&mut core.vals.$id), expr!(&mut core.tokens.$id));
} else {
expr!($selection.core_mut().vals.$id) = Some($async);
$handled += 1;
}
}};
($selection:ident, $pending:ident, $handled:ident, ($async:ident, $id:tt), $($rest:tt),+) => {{
components!($selection, $pending, $handled, ($async, $id));
components!($selection, $pending, $handled, $($rest),*);
}};
}
macro_rules! tuple_select {
($count:expr, $complete:ident, $vals:expr, $tokens:expr, $(($async:ident, $id:tt)),+) => {{
let selection = Selection::new($vals, $tokens, $count, $complete);
let mut pending = true;
let mut handled = 0;
components!(selection, pending, handled, $(($async, $id)),*);
if handled > 0 {
selection.dec_remaining(handled, selection.core().state.load(Ordering::Relaxed));
}
}}
}
macro_rules! cancel_callbacks {
($s:ident, $selected:ident, $up_to:ident, $tokens:ident, $ret:ident, $i:tt) => {{
{
let i = expr!($i);
if $selected != i && $up_to >= i+1 {
let cancel = expr!($tokens.$i.take().expect("cancel token missing"));
if let Some(async) = cancel.cancel() {
expr!($s.$i) = Some(async);
$ret += 1;
}
}
}
}};
($s:ident, $selected:ident, $up_to:ident, $tokens:ident, $ret:ident, $i:tt, $($rest:tt),+) => {{
cancel_callbacks!($s, $selected, $up_to, $tokens, $ret, $i);
cancel_callbacks!($s, $selected, $up_to, $tokens, $ret, $($rest),*);
}};
}
struct AtomicState {
atomic: AtomicU64,
}
impl AtomicState {
fn new(remaining: u32) -> AtomicState {
let init = State::new(remaining);
AtomicState { atomic: AtomicU64::new(init.as_u64()) }
}
fn load(&self, order: Ordering) -> State {
State::load(self.atomic.load(order))
}
fn compare_and_swap(&self, old: State, new: State, order: Ordering) -> State {
let actual = self.atomic.compare_and_swap(old.as_u64(), new.as_u64(), order);
State::load(actual)
}
fn try_win(&self, index: u32, err: bool) -> (bool, State, State) {
let mut curr = self.load(Ordering::Relaxed);
loop {
if curr.is_won() {
return (false, curr, curr);
}
let next = if err {
curr.as_err()
} else {
curr.as_won(index)
};
let actual = self.compare_and_swap(curr, next, Ordering::Acquire);
if actual == curr {
return (true, curr, next);
}
curr = actual;
}
}
fn inc_callbacks_registered(&self) -> (bool, State) {
let mut curr = self.load(Ordering::Relaxed);
loop {
if curr.is_won() {
return (false, curr);
}
let next = curr.inc_callbacks_registered();
let actual = self.compare_and_swap(curr, next, Ordering::Release);
if actual == curr {
return (true, next);
}
curr = actual;
}
}
fn dec_remaining(&self, count: u32, mut curr: State) -> State {
loop {
assert!(curr.remaining() >= count, "curr={}; count={}", curr.remaining(), count);
let next = curr.dec_remaining(count);
let actual = self.compare_and_swap(curr, next, Ordering::Release);
if actual == curr {
debug!("dec_remaining -- transitioned state; from={:?}; to={:?}", curr, next);
return next;
}
curr = actual;
}
}
}
#[derive(Copy, Clone, PartialEq, Eq)]
struct State {
val: u64,
}
impl fmt::Debug for State {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
if self.is_won() {
write!(fmt, "State[won={}; err={}; selected={}; remaining={}]",
true, self.is_err(), self.selected(), self.remaining())
} else {
write!(fmt, "State[won={}; err={}; callbacks={}; remaining={}]",
false, false, self.callbacks_registered(), self.remaining())
}
}
}
const WON_MASK: u64 = 1;
const ERR_MASK: u64 = 1 << 1;
const REM_MASK: u64 = (1 << 31) - 1;
impl State {
fn new(remaining: u32) -> State {
assert!(remaining <= (u32::MAX >> 1));
State { val: (remaining as u64) << 2 }
}
fn load(val: u64) -> State {
State { val: val }
}
fn is_won(&self) -> bool {
self.val & WON_MASK == WON_MASK
}
fn as_won(&self, index: u32) -> State {
assert!(index <= (u32::MAX >> 1));
let val = WON_MASK | ((index as u64) << 33)
| (self.val & (REM_MASK << 2))
;
State { val: val }
}
fn is_err(&self) -> bool {
self.val & ERR_MASK == ERR_MASK
}
fn as_err(&self) -> State {
let val = WON_MASK | ERR_MASK
| (self.val & (REM_MASK << 2))
;
State { val: val }
}
fn callbacks_registered(&self) -> u32 {
assert!(!self.is_won());
self.most_significant_u31()
}
fn inc_callbacks_registered(&self) -> State {
assert!(!self.is_won());
State { val: self.val + (1 << 33) }
}
fn selected(&self) -> u32 {
assert!(self.is_won());
self.most_significant_u31()
}
fn remaining(&self) -> u32 {
((self.val >> 2) & REM_MASK) as u32
}
fn dec_remaining(&self, count: u32) -> State {
State { val: self.val - ((count as u64) << 2) }
}
fn most_significant_u31(&self) -> u32 {
(self.val >> 33) as u32
}
fn as_u64(&self) -> u64 {
self.val
}
}
impl<A1: Async<Error=E>, A2: Async<Error=E>, E: Send + 'static> Select<E> for (A1, A2) {
fn select(self, complete: Complete<(u32, (A1, A2)), E>) {
let (a1, a2) = self;
tuple_select!(
2, complete,
(None, None),
(None, None),
(a1, 0), (a2, 1));
}
}
impl<A1: Async<Error=E>, A2: Async<Error=E>, E> Values<(A1, A2), E> for (Option<A1>, Option<A2>)
where E: Send + 'static {
type Tokens = (Option<A1::Cancel>, Option<A2::Cancel>);
fn consume(&mut self) -> (A1, A2) {
(self.0.take().unwrap(), self.1.take().unwrap())
}
fn cancel_callbacks(&mut self,
selected: u32,
up_to: u32,
tokens: &mut (Option<A1::Cancel>, Option<A2::Cancel>)) -> u32 {
let mut ret = 0;
cancel_callbacks!(
self, selected, up_to, tokens, ret,
0, 1);
ret
}
}
impl<A1: Async<Error=E>, A2: Async<Error=E>, A3: Async<Error=E>, E: Send + 'static> Select<E> for (A1, A2, A3) {
fn select(self, complete: Complete<(u32, (A1, A2, A3)), E>) {
let (a1, a2, a3) = self;
tuple_select!(
3, complete,
(None, None, None),
(None, None, None),
(a1, 0), (a2, 1), (a3, 2));
}
}
impl<A1: Async<Error=E>, A2: Async<Error=E>, A3: Async<Error=E>, E> Values<(A1, A2, A3), E> for (Option<A1>, Option<A2>, Option<A3>)
where E: Send + 'static {
type Tokens = (Option<A1::Cancel>, Option<A2::Cancel>, Option<A3::Cancel>);
fn consume(&mut self) -> (A1, A2, A3) {
(self.0.take().unwrap(), self.1.take().unwrap(), self.2.take().unwrap())
}
fn cancel_callbacks(&mut self,
selected: u32,
up_to: u32,
tokens: &mut (Option<A1::Cancel>, Option<A2::Cancel>, Option<A3::Cancel>)) -> u32 {
let mut ret = 0;
cancel_callbacks!(
self, selected, up_to, tokens, ret,
0, 1, 2);
ret
}
}