use {
receipt,
Async,
Pair,
Future,
Complete,
Cancel,
Receipt,
AsyncResult,
AsyncError
};
use super::core::{self, Core};
use std::fmt;
#[must_use = "streams are lazy and do nothing unless consumed"]
pub struct Stream<T: Send + 'static, E: Send + 'static> {
core: Option<Core<Head<T, E>, E>>,
}
pub type Head<T, E> = Option<(T, Stream<T, E>)>;
pub type StreamCore<T, E> = Core<Head<T, E>, E>;
impl<T: Send + 'static, E: Send + 'static> Stream<T, E> {
pub fn pair() -> (Sender<T, E>, Stream<T, E>) {
let core = Core::new();
let stream = Stream { core: Some(core.clone()) };
(Sender { core: Some(core) }, stream)
}
pub fn empty() -> Stream<T, E> {
Stream { core: Some(Core::with_value(Ok(None))) }
}
pub fn collect(self) -> Future<Vec<T>, E> {
let buffer = Vec::new();
self.reduce(buffer, |mut vec, item| { vec.push(item); return vec })
}
pub fn iter(mut self) -> StreamIter<T, E> {
StreamIter { core: Some(core::take(&mut self.core)) }
}
pub fn each<F: FnMut(T) + Send + 'static>(self, f: F) -> Future<(), E> {
let (complete, ret) = Future::pair();
complete.receive(move |res| {
if let Ok(complete) = res {
self.do_each(f, complete);
}
});
ret
}
fn do_each<F: FnMut(T) + Send + 'static>(self, mut f: F, complete: Complete<(), E>) {
self.receive(move |head| {
match head {
Ok(Some((v, rest))) => {
f(v);
rest.do_each(f, complete);
}
Ok(None) => {
complete.complete(());
}
Err(AsyncError::Failed(e)) => {
complete.fail(e);
}
_ => {}
}
});
}
pub fn filter<F: FnMut(&T) -> bool + Send + 'static>(self, f: F) -> Stream<T, E> {
let (sender, stream) = Stream::pair();
self.do_filter(f, sender);
stream
}
fn do_filter<F, A>(self, mut f: F, sender: A)
where F: FnMut(&T) -> bool + Send + 'static,
A: Async<Value=Sender<T, E>> {
sender.receive(move |res| {
if let Ok(sender) = res {
self.receive(move |head| {
match head {
Ok(Some((v, rest))) => {
if f(&v) {
rest.do_filter(f, sender.send(v));
} else {
rest.do_filter(f, sender);
}
}
Ok(None) => {}
Err(AsyncError::Failed(e)) => sender.fail(e),
Err(AsyncError::Aborted) => sender.abort(),
}
});
}
});
}
pub fn map<F: FnMut(T) -> U + Send + 'static, U: Send + 'static>(self, mut f: F) -> Stream<U, E> {
self.map_async(move |val| Ok(f(val)))
}
pub fn map_async<F, U>(self, action: F) -> Stream<U::Value, E>
where F: FnMut(T) -> U + Send + 'static,
U: Async<Error=E> {
let (sender, ret) = Stream::pair();
sender.receive(move |res| {
if let Ok(sender) = res {
self.do_map(sender, action);
}
});
ret
}
fn do_map<F, U>(self, sender: Sender<U::Value, E>, mut f: F)
where F: FnMut(T) -> U + Send + 'static,
U: Async<Error=E> {
self.receive(move |head| {
match head {
Ok(Some((v, rest))) => {
f(v).receive(move |res| {
match res {
Ok(val) => {
sender.send(val).receive(move |res| {
if let Ok(sender) = res {
rest.do_map(sender, f);
}
});
}
Err(AsyncError::Failed(e)) => sender.fail(e),
Err(AsyncError::Aborted) => sender.abort(),
}
});
}
Ok(None) => {}
Err(AsyncError::Failed(e)) => sender.fail(e),
Err(AsyncError::Aborted) => sender.abort(),
}
});
}
pub fn map_err<F, U>(self, f: F) -> Stream<T, U>
where F: FnOnce(E) -> U + Send + 'static,
U: Send + 'static {
let (sender, stream) = Stream::pair();
sender.receive(move |res| {
if let Ok(sender) = res {
self.do_map_err(sender, f);
}
});
stream
}
fn do_map_err<F, U>(self, sender: Sender<T, U>, f: F)
where F: FnOnce(E) -> U + Send + 'static,
U: Send + 'static {
self.receive(move |res| {
match res {
Ok(Some((val, rest))) => {
sender.send(val).receive(move |res| {
if let Ok(sender) = res {
rest.do_map_err(sender, f);
}
});
}
Ok(None) => {}
Err(AsyncError::Failed(e)) => sender.fail(f(e)),
Err(AsyncError::Aborted) => sender.abort(),
}
});
}
pub fn process<F, U>(self, in_flight: usize, f: F) -> Stream<U::Value, E>
where F: FnMut(T) -> U + Send + 'static,
U: Async<Error=E> {
use process::process;
process(self, in_flight, f)
}
pub fn reduce<F: Fn(U, T) -> U + Send + 'static, U: Send + 'static>(self, init: U, f: F) -> Future<U, E> {
self.reduce_async(init, move |curr, val| Ok(f(curr, val)))
}
pub fn reduce_async<F, U, X>(self, init: X, action: F) -> Future<X, E>
where F: Fn(X, T) -> U + Send + 'static,
U: Async<Value=X, Error=E>,
X: Send + 'static {
let (sender, ret) = Future::pair();
sender.receive(move |res| {
if let Ok(sender) = res {
self.do_reduce(sender, init, action);
}
});
ret
}
fn do_reduce<F, U>(self, complete: Complete<U::Value, E>, curr: U::Value, f: F)
where F: Fn(U::Value, T) -> U + Send + 'static,
U: Async<Error=E> {
self.receive(move |head| {
match head {
Ok(Some((v, rest))) => {
f(curr, v).receive(move |res| {
match res {
Ok(curr) => rest.do_reduce(complete, curr, f),
Err(AsyncError::Failed(e)) => complete.fail(e),
Err(AsyncError::Aborted) => drop(complete),
}
});
}
Ok(None) => complete.complete(curr),
Err(AsyncError::Failed(e)) => complete.fail(e),
Err(AsyncError::Aborted) => drop(complete),
}
});
}
pub fn take(self, n: u64) -> Stream<T, E> {
let (sender, stream) = Stream::pair();
self.do_take(n, sender);
stream
}
fn do_take<A>(self, n: u64, sender: A) where A: Async<Value=Sender<T, E>> {
if n == 0 {
return;
}
sender.receive(move |res| {
if let Ok(sender) = res {
self.receive(move |res| {
match res {
Ok(Some((v, rest))) => {
rest.do_take(n - 1, sender.send(v));
}
Ok(None) => {}
Err(AsyncError::Failed(e)) => sender.fail(e),
Err(AsyncError::Aborted) => sender.abort(),
}
});
}
});
}
pub fn take_while<F>(self, _f: F) -> Stream<T, E>
where F: Fn(&T) -> bool + Send + 'static {
unimplemented!();
}
pub fn take_until<A>(self, cond: A) -> Stream<T, E>
where A: Async<Error=E> {
super::select((cond, self))
.and_then(move |(i, (cond, stream))| {
if i == 0 {
Ok(None)
} else {
match stream.expect() {
Ok(Some((v, rest))) => {
Ok(Some((v, rest.take_until(cond))))
}
_ => Ok(None),
}
}
}).to_stream()
}
pub fn to_future(mut self) -> Future<Head<T, E>, E> {
use super::future;
future::from_core(core::take(&mut self.core))
}
fn from_core(core: StreamCore<T, E>) -> Stream<T, E> {
Stream { core: Some(core) }
}
}
impl<T: Send + 'static, E: Send + 'static> Async for Stream<T, E> {
type Value = Head<T, E>;
type Error = E;
type Cancel = Receipt<Stream<T, E>>;
fn is_ready(&self) -> bool {
core::get(&self.core).consumer_is_ready()
}
fn is_err(&self) -> bool {
core::get(&self.core).consumer_is_err()
}
fn poll(mut self) -> Result<AsyncResult<Head<T, E>, E>, Stream<T, E>> {
let mut core = core::take(&mut self.core);
match core.consumer_poll() {
Some(res) => Ok(res),
None => Err(Stream { core: Some(core) })
}
}
fn ready<F: FnOnce(Stream<T, E>) + Send + 'static>(mut self, f: F) -> Receipt<Stream<T, E>> {
let core = core::take(&mut self.core);
match core.consumer_ready(move |core| f(Stream::from_core(core))) {
Some(count) => receipt::new(core, count),
None => receipt::none(),
}
}
fn await(mut self) -> AsyncResult<Head<T, E>, E> {
core::take(&mut self.core).consumer_await()
}
}
impl<T: Send + 'static, E: Send + 'static> Pair for Stream<T, E> {
type Tx = Sender<T, E>;
fn pair() -> (Sender<T, E>, Stream<T, E>) {
Stream::pair()
}
}
impl<T: Send + 'static, E: Send + 'static> fmt::Debug for Stream<T, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Stream<?>")
}
}
impl<T: Send + 'static, E: Send + 'static> Drop for Stream<T, E> {
fn drop(&mut self) {
if self.core.is_some() {
core::take(&mut self.core).cancel();
}
}
}
impl<T: Send + 'static, E: Send + 'static> Cancel<Stream<T, E>> for Receipt<Stream<T, E>> {
fn cancel(self) -> Option<Stream<T, E>> {
let (core, count) = receipt::parts(self);
if !core.is_some() {
return None;
}
if core::get(&core).consumer_ready_cancel(count) {
return Some(Stream { core: core });
}
None
}
}
pub struct Sender<T: Send + 'static, E: Send + 'static> {
core: Option<StreamCore<T, E>>,
}
impl<T: Send + 'static, E: Send + 'static> Sender<T, E> {
pub fn send(mut self, val: T) -> BusySender<T, E> {
let mut core = core::take(&mut self.core);
let val = Some((val, Stream { core: Some(core.clone()) }));
core.complete(Ok(val), false);
BusySender { core: Some(core) }
}
pub fn fail(mut self, err: E) {
core::take(&mut self.core).complete(Err(AsyncError::failed(err)), true);
}
pub fn abort(mut self) {
core::take(&mut self.core).complete(Err(AsyncError::aborted()), true);
}
pub fn send_all<S: Source<Value=T>>(self, src: S) -> Future<Self, (S::Error, Self)> {
src.send_all(self)
}
fn from_core(core: StreamCore<T, E>) -> Sender<T, E> {
Sender { core: Some(core) }
}
}
impl<T: Send + 'static, E: Send + 'static> Async for Sender<T, E> {
type Value = Sender<T, E>;
type Error = ();
type Cancel = Receipt<Sender<T, E>>;
fn is_ready(&self) -> bool {
core::get(&self.core).producer_is_ready()
}
fn is_err(&self) -> bool {
core::get(&self.core).producer_is_err()
}
fn poll(mut self) -> Result<AsyncResult<Sender<T, E>, ()>, Sender<T, E>> {
debug!("Sender::poll; is_ready={}", self.is_ready());
let core = core::take(&mut self.core);
match core.producer_poll() {
Some(res) => Ok(res.map(Sender::from_core)),
None => Err(Sender { core: Some(core) })
}
}
fn ready<F: FnOnce(Sender<T, E>) + Send + 'static>(mut self, f: F) -> Receipt<Sender<T, E>> {
core::take(&mut self.core).producer_ready(move |core| {
f(Sender::from_core(core))
});
receipt::none()
}
}
impl<T: Send + 'static, E: Send + 'static> Drop for Sender<T, E> {
fn drop(&mut self) {
if self.core.is_some() {
debug!("Sender::drop(); cancelling future");
let mut core = core::take(&mut self.core);
core.complete(Ok(None), true);
}
}
}
pub struct BusySender<T: Send + 'static, E: Send + 'static> {
core: Option<StreamCore<T, E>>,
}
impl<T: Send + 'static, E: Send + 'static> BusySender<T, E> {
fn from_core(core: StreamCore<T, E>) -> BusySender<T, E> {
BusySender { core: Some(core) }
}
}
impl<T: Send + 'static, E: Send + 'static> Async for BusySender<T, E> {
type Value = Sender<T, E>;
type Error = ();
type Cancel = Receipt<BusySender<T, E>>;
fn is_ready(&self) -> bool {
core::get(&self.core).consumer_is_ready()
}
fn is_err(&self) -> bool {
core::get(&self.core).consumer_is_err()
}
fn poll(mut self) -> Result<AsyncResult<Sender<T, E>, ()>, BusySender<T, E>> {
debug!("Sender::poll; is_ready={}", self.is_ready());
let core = core::take(&mut self.core);
match core.producer_poll() {
Some(res) => Ok(res.map(Sender::from_core)),
None => Err(BusySender { core: Some(core) })
}
}
fn ready<F: FnOnce(BusySender<T, E>) + Send + 'static>(mut self, f: F) -> Receipt<BusySender<T, E>> {
core::take(&mut self.core).producer_ready(move |core| {
f(BusySender::from_core(core))
});
receipt::none()
}
}
impl<T: Send + 'static, E: Send + 'static> Drop for BusySender<T, E> {
fn drop(&mut self) {
if self.core.is_some() {
let core = core::take(&mut self.core);
core.producer_ready(|mut core| {
if core.producer_is_ready() {
core.complete(Ok(None), true);
}
});
}
}
}
impl<T: Send + 'static, E: Send + 'static> fmt::Debug for Sender<T, E> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Sender<?>")
}
}
pub trait Source {
type Value: Send + 'static;
type Error: Send + 'static;
fn send_all<E2: Send + 'static>(self, sender: Sender<Self::Value, E2>) ->
Future<Sender<Self::Value, E2>, (Self::Error, Sender<Self::Value, E2>)>;
}
impl<T: Send + 'static, E: Send + 'static> Source for Future<T, E> {
type Value = T;
type Error = E;
fn send_all<E2: Send + 'static>(self, sender: Sender<T, E2>) -> Future<Sender<T, E2>, (E, Sender<T, E2>)> {
let (tx, rx) = Future::pair();
self.receive(move |res| {
match res {
Ok(val) => {
sender.send(val).receive(move |res| {
if let Ok(sender) = res {
tx.complete(sender);
}
});
}
Err(AsyncError::Failed(e)) => tx.fail((e, sender)),
Err(AsyncError::Aborted) => drop(tx),
}
});
rx
}
}
impl<T: Send + 'static, E: Send + 'static> Source for Stream<T, E> {
type Value = T;
type Error = E;
fn send_all<E2: Send + 'static>(self, sender: Sender<T, E2>) -> Future<Sender<T, E2>, (E, Sender<T, E2>)> {
let (tx, rx) = Future::pair();
send_stream(self, sender, tx);
rx
}
}
fn send_stream<T: Send + 'static, E: Send + 'static, E2: Send + 'static>(
src: Stream<T, E>,
dst: Sender<T, E2>,
complete: Complete<Sender<T, E2>, (E, Sender<T, E2>)>) {
src.receive(move |res| {
match res {
Ok(Some((val, rest))) => {
dst.send(val).receive(move |res| {
if let Ok(dst) = res {
send_stream(rest, dst, complete);
}
});
}
Ok(None) => complete.complete(dst),
Err(AsyncError::Failed(e)) => complete.fail((e, dst)),
Err(AsyncError::Aborted) => drop(complete),
}
})
}
impl<T: Send + 'static, E: Send + 'static> Cancel<Sender<T, E>> for Receipt<Sender<T, E>> {
fn cancel(self) -> Option<Sender<T, E>> {
None
}
}
impl<T: Send + 'static, E: Send + 'static> Cancel<BusySender<T, E>> for Receipt<BusySender<T, E>> {
fn cancel(self) -> Option<BusySender<T, E>> {
None
}
}
pub struct StreamIter<T: Send + 'static, E: Send + 'static> {
core: Option<StreamCore<T, E>>,
}
impl<T: Send + 'static, E: Send + 'static> Iterator for StreamIter<T, E> {
type Item = T;
fn next(&mut self) -> Option<T> {
use std::mem;
match core::get_mut(&mut self.core).consumer_await() {
Ok(Some((h, mut rest))) => {
mem::replace(&mut self.core, Some(core::take(&mut rest.core)));
Some(h)
}
Ok(None) => {
let _ = core::take(&mut self.core);
None
}
Err(_) => unimplemented!(),
}
}
}
impl<T: Send + 'static, E: Send + 'static> Drop for StreamIter<T, E> {
fn drop(&mut self) {
if self.core.is_some() {
core::take(&mut self.core).cancel();
}
}
}
pub fn from_core<T: Send + 'static, E: Send + 'static>(core: StreamCore<T, E>) -> Stream<T, E> {
Stream { core: Some(core) }
}