#![allow(dead_code, unused_variables, unused_imports)]
use {Async, Stream, Sender, AsyncResult, AsyncError};
use syncbox::ArrayQueue;
use std::{mem, ops};
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
const MAX_IN_FLIGHT: usize = (1 << 16) - 1;
pub fn process<T, U, F>(source: Stream<T, U::Error>, in_flight: usize, action: F) -> Stream<U::Value, U::Error>
where T: Send + 'static,
U: Async,
F: FnMut(T) -> U + Send + 'static {
let (tx, rx) = Stream::pair();
if in_flight > 0 {
tx.receive(move |res| {
if let Ok(tx) = res {
setup(source, in_flight, action, tx);
}
});
}
rx
}
fn setup<T, U, F>(source: Stream<T, U::Error>, in_flight: usize, action: F, dest: Sender<U::Value, U::Error>)
where T: Send + 'static,
U: Async,
F: FnMut(T) -> U + Send + 'static {
let mut inner = Inner::new(source, in_flight, action, dest);
inner.maybe_process_next(false);
}
struct Core<T: Send + 'static, U: Async, F> {
max: usize,
queue: ArrayQueue<AsyncResult<U::Value, U::Error>>,
sender: Option<Sender<U::Value, U::Error>>,
source: Option<Source<T, U, F>>,
consume_state: AtomicState,
produce_state: AtomicState,
}
struct Source<T: Send + 'static, U: Async, F> {
stream: Stream<T, U::Error>,
action: F,
}
struct Inner<T: Send + 'static, U: Async, F>(Arc<UnsafeCell<Core<T, U, F>>>);
impl<T: Send + 'static, U: Async, F: FnMut(T) -> U + Send + 'static> Inner<T, U, F> {
fn new(source: Stream<T, U::Error>,
in_flight: usize,
action: F,
dest: Sender<U::Value, U::Error>) -> Inner<T, U, F> {
let core = Core {
max: in_flight,
queue: ArrayQueue::with_capacity(in_flight),
source: Some(Source { stream: source, action: action }),
sender: Some(dest),
consume_state: AtomicState::new(),
produce_state: AtomicState::new(),
};
Inner(Arc::new(UnsafeCell::new(core)))
}
fn maybe_process_next(&mut self, dec_in_flight: bool) {
if self.try_acquire_consume_lock(dec_in_flight, Ordering::Acquire) {
let Source { stream, mut action } = self.source.take().expect("source should be present");
let mut inner = self.clone();
stream.receive(move |res| {
match res {
Ok(Some((val, rest))) => {
let val = action(val);
inner.source = Some(Source { stream: rest, action: action });
inner.consume_state.release_lock(Ordering::Release);
let mut inner2 = inner.clone();
val.receive(move |res| {
match res {
Ok(val) => {
inner2.receive_value(Ok(val));
}
Err(err) => {
inner2.receive_value(Err(err));
}
}
});
inner.maybe_process_next(false);
}
Ok(None) => {}
Err(AsyncError::Failed(_)) => {
unimplemented!();
}
_ => unimplemented!(),
}
});
}
}
fn receive_value(&mut self, val: AsyncResult<U::Value, U::Error>) {
self.queue.push(val).ok()
.expect("value queue should never run out of capacity");
if self.acquire_produce_lock_or_inc_in_flight(Ordering::Acquire) {
self.send_value();
}
}
fn send_value(&mut self) {
let sender = self.sender.take().expect("expected sender to be sender");
let value = self.queue.pop().expect("expected value to be in queue");
match value {
Ok(value) => {
let mut inner = self.clone();
sender.send(value).receive(move |res| {
match res {
Ok(sender) => {
inner.sender = Some(sender);
if inner.release_lock_if_idle(Ordering::Release) {
inner.send_value();
}
}
Err(_) => {
}
}
});
}
Err(AsyncError::Failed(e)) => sender.fail(e),
Err(AsyncError::Aborted) => sender.abort(),
}
self.maybe_process_next(true);
}
fn try_acquire_consume_lock(&self, dec_in_flight: bool, order: Ordering) -> bool {
let mut old = self.consume_state.load(Ordering::Relaxed);
loop {
if (old.has_lock() || old.in_flight() == self.max) && !dec_in_flight {
return false;
}
let new = if old.has_lock() {
debug_assert!(dec_in_flight, "state transition bug");
old.dec_in_flight()
} else {
debug_assert!(old.in_flight() < self.max || dec_in_flight, "state transition bug");
if dec_in_flight {
old.with_lock()
} else {
old.inc_in_flight().with_lock()
}
};
let act = self.consume_state.compare_and_swap(old, new, order);
if act == old {
return !old.has_lock();
}
old = act;
}
}
fn try_acquire_produce_lock(&self, order: Ordering) -> bool {
let mut old = self.produce_state.load(order);
loop {
if old.has_lock() {
return false;
}
let act = self.produce_state.compare_and_swap(old, old.with_lock(), order);
if act == old {
return true;
}
old = act
}
}
fn acquire_produce_lock_or_inc_in_flight(&self, order: Ordering) -> bool {
let mut old = self.produce_state.load(Ordering::Relaxed);
loop {
let new = if old.has_lock() {
old.inc_in_flight()
} else {
old.with_lock()
};
let act = self.produce_state.compare_and_swap(old, new, order);
if act == old {
return !old.has_lock();
}
old = act;
}
}
fn release_lock_if_idle(&self, order: Ordering) -> bool {
let mut old = self.produce_state.load(Ordering::Relaxed);
loop {
let new = if old.in_flight() > 0 {
old.dec_in_flight()
} else {
old.without_lock()
};
let act = self.produce_state.compare_and_swap(old, new, order);
if act == old {
return new.has_lock();
}
old = act;
}
}
}
impl<T: Send + 'static, U: Async, F> ops::Deref for Inner<T, U, F> {
type Target = Core<T, U, F>;
fn deref(&self) -> &Core<T, U, F> {
unsafe { &*self.0.get() }
}
}
impl<T: Send + 'static, U: Async, F> ops::DerefMut for Inner<T, U, F> {
fn deref_mut(&mut self) -> &mut Core<T, U, F> {
unsafe { &mut *self.0.get() }
}
}
impl<T: Send + 'static, U: Async, F> Clone for Inner<T, U, F> {
fn clone(&self) -> Inner<T, U, F> {
Inner(self.0.clone())
}
}
unsafe impl<T: Send, U: Async, F> Send for Inner<T, U, F> { }
unsafe impl<T: Send, U: Async, F> Sync for Inner<T, U, F> { }
const LOCK: usize = 1 << 31;
#[derive(Copy, Clone, Eq, PartialEq)]
struct State(usize);
impl State {
fn in_flight(&self) -> usize {
self.0 & MAX_IN_FLIGHT
}
fn inc_in_flight(&self) -> State {
assert!(self.in_flight() < MAX_IN_FLIGHT);
State(self.0 + 1)
}
fn dec_in_flight(&self) -> State {
assert!(self.in_flight() > 0);
State(self.0 - 1)
}
fn has_lock(&self) -> bool {
self.0 & LOCK == LOCK
}
fn with_lock(&self) -> State {
State(self.0 | LOCK)
}
fn without_lock(&self) -> State {
State(!LOCK & self.0)
}
}
struct AtomicState {
atomic: AtomicUsize,
}
impl AtomicState {
fn new() -> AtomicState {
AtomicState { atomic: AtomicUsize::new(0) }
}
fn load(&self, order: Ordering) -> State {
let val = self.atomic.load(order);
State(val)
}
fn compare_and_swap(&self, old: State, new: State, order: Ordering) -> State {
let val = self.atomic.compare_and_swap(old.0, new.0, order);
State(val)
}
fn release_lock(&self, order: Ordering) {
self.atomic.fetch_sub(LOCK, order);
}
}