use {Async, AsyncError, Stream, Sender};
use std::ops;
use std::cell::UnsafeCell;
use std::iter::IntoIterator;
use std::sync::Arc;
use std::sync::atomic::{self, AtomicUsize, Ordering};
pub fn sequence<I, A>(asyncs: I) -> Stream<A::Value, A::Error>
where I: IntoIterator<Item=A> + Send + 'static,
A: Async {
let (tx, rx) = Stream::pair();
tx.receive(move |res| {
debug!("sequence() - sequence consumer ready; res={:?}", res);
if let Ok(tx) = res {
setup(asyncs, tx);
}
});
rx
}
fn setup<I, A>(asyncs: I, sender: Sender<A::Value, A::Error>)
where I: IntoIterator<Item=A>,
A: Async {
let vec: Vec<Option<A>> = asyncs.into_iter()
.map(|a| Some(a))
.collect();
let inner = Inner::new(vec, sender);
for i in 0..inner.queue.len() {
let mut inner = inner.clone();
let async = inner.queue[i].take()
.expect("expected a value to be present");
atomic::fence(Ordering::Release);
debug!("setup() - async.ready callback; i={}", i);
async.ready(move |async| {
debug!("setup() - async is ready; i={}", i);
inner.ready(async);
});
}
}
const IDLE: usize = 0; const BUSY: usize = 1; const SEND: usize = 2; const FAIL: usize = 3; const DROP: usize = 4;
struct Core<A: Async> {
queue: Vec<Option<A>>,
next: AtomicUsize, ready: AtomicUsize, state: AtomicUsize, enqueue: AtomicUsize, sender: Option<Sender<A::Value, A::Error>>,
}
struct Inner<A: Async>(Arc<UnsafeCell<Core<A>>>);
impl<A: Async> Inner<A> {
fn new(queue: Vec<Option<A>>, sender: Sender<A::Value, A::Error>) -> Inner<A> {
let core = Core {
queue: queue,
next: AtomicUsize::new(0),
ready: AtomicUsize::new(0),
state: AtomicUsize::new(IDLE),
enqueue: AtomicUsize::new(0),
sender: Some(sender),
};
Inner(Arc::new(UnsafeCell::new(core)))
}
fn ready(&mut self, async: A) {
self.enqueue(async);
let curr = self.state.compare_and_swap(IDLE, SEND, Ordering::Relaxed);
debug!("Inner::ready() - current-state={}", curr);
if IDLE == curr {
self.send();
}
}
fn send(&mut self) {
debug!("Inner::send(); sending value");
let sender = self.sender.take().expect("expected the stream sender to be present");
let i = self.next.fetch_add(1, Ordering::Acquire);
let async = self.queue[i].take().expect("expected an async value to be present");
match async.expect() {
Ok(val) => {
let mut inner = self.clone();
self.state.swap(BUSY, Ordering::Release);
sender.send(val).receive(move |res| {
match res {
Ok(sender) => {
inner.send_ready(sender, i);
}
Err(_) => {
inner.state.swap(DROP, Ordering::Relaxed);
}
}
});
}
Err(e) => {
self.state.swap(FAIL, Ordering::Release);
match e {
AsyncError::Failed(e) => sender.fail(e),
_ => sender.abort(),
}
}
}
}
fn send_ready(&mut self, sender: Sender<A::Value, A::Error>, prev: usize) {
self.sender = Some(sender);
self.state.swap(IDLE, Ordering::Release);
let ready = self.ready.load(Ordering::Relaxed);
debug!("Inner::send_ready; prev={}; ready={}", prev, ready);
if prev + 1 < ready {
if IDLE == self.state.compare_and_swap(IDLE, SEND, Ordering::Relaxed) {
self.send();
}
}
}
fn enqueue(&mut self, async: A) {
let i = self.enqueue.fetch_add(1, Ordering::Acquire);
debug!("Inner::enqueue(); i={}", i);
self.queue[i] = Some(async);
loop {
let j = self.ready.load(Ordering::Relaxed);
if j != i {
continue;
}
if j == self.ready.compare_and_swap(j, j + 1, Ordering::Release) {
break;
}
}
}
}
impl<A: Async> ops::Deref for Inner<A> {
type Target = Core<A>;
fn deref(&self) -> &Core<A> {
unsafe { &*self.0.get() }
}
}
impl<A: Async> ops::DerefMut for Inner<A> {
fn deref_mut(&mut self) -> &mut Core<A> {
unsafe { &mut *self.0.get() }
}
}
impl<A: Async> Clone for Inner<A> {
fn clone(&self) -> Inner<A> {
Inner(self.0.clone())
}
}
unsafe impl<A: Async> Send for Inner<A> { }
unsafe impl<A: Async> Sync for Inner<A> { }