use std::panic;
use futures::{Async, AsyncSink, Future, Poll, Sink, Stream};
use prelude::CoroutineFuture;
use errors::Dropped;
pub struct CleanupIterator<S>(Option<S>);
impl<S> CleanupIterator<S> {
pub fn new(stream: S) -> Self {
CleanupIterator(Some(stream))
}
pub fn into_inner(self) -> Result<S, Dropped> {
self.0.ok_or(Dropped)
}
}
impl<I, E, S: Stream<Item = I, Error = E>> Iterator for CleanupIterator<S> {
type Item = Result<Result<I, E>, Dropped>;
fn next(&mut self) -> Option<Result<Result<I, E>, Dropped>> {
let resolved = match self.0.take() {
Some(stream) => stream.into_future().coro_wait_cleanup(),
None => return Some(Err(Dropped)), };
let (result, stream) = match resolved {
Ok(Ok((None, stream))) => (None, Some(stream)),
Ok(Ok((Some(ok), stream))) => (Some(Ok(Ok(ok))), Some(stream)),
Ok(Err((err, stream))) => (Some(Ok(Err(err))), Some(stream)),
Err(Dropped) => (Some(Err(Dropped)), None),
};
self.0 = stream;
result
}
}
fn drop_panic<T>(r: Result<T, Dropped>) -> T {
r.unwrap_or_else(|_| panic::resume_unwind(Box::new(Dropped)))
}
pub struct OkIterator<I>(I);
impl<I> OkIterator<I> {
pub fn new(inner: I) -> Self {
OkIterator(inner)
}
pub fn into_inner(self) -> I {
self.0
}
}
impl<I, E, S: Stream<Item = I, Error = E>> Iterator for OkIterator<CleanupIterator<S>> {
type Item = I;
fn next(&mut self) -> Option<I> {
self.0
.next()
.map(drop_panic)
.and_then(Result::ok)
}
}
pub struct ResultIterator<I>(I);
impl<I> ResultIterator<I> {
pub fn new(inner: I) -> Self {
ResultIterator(inner)
}
pub fn into_inner(self) -> I {
self.0
}
}
impl<I, E, S: Stream<Item = I, Error = E>> Iterator for ResultIterator<CleanupIterator<S>> {
type Item = Result<I, E>;
fn next(&mut self) -> Option<Result<I, E>> {
self.0
.next()
.map(drop_panic)
}
}
pub struct StreamExtractor<'a, S: 'a>(&'a mut S);
impl<'a, S: 'a> StreamExtractor<'a, S> {
pub fn new(stream: &'a mut S) -> Self {
StreamExtractor(stream)
}
}
impl<'a, I, E, S: Stream<Item = I, Error = E> + 'a> Future for StreamExtractor<'a, S> {
type Item = Option<I>;
type Error = E;
fn poll(&mut self) -> Poll<Option<I>, E> {
self.0.poll()
}
}
pub struct SinkSender<'a, V, S: 'a, I: Iterator<Item = V>> {
sink: &'a mut S,
iter: Option<I>,
value: Option<V>,
}
impl<'a, V, S: 'a, I: Iterator<Item = V>> SinkSender<'a, V, S, I> {
pub fn new<Src: IntoIterator<IntoIter = I, Item = V>>(sink: &'a mut S, src: Src) -> Self {
let iter = src.into_iter();
Self {
sink,
iter: Some(iter),
value: None,
}
}
fn next(&mut self) -> Option<V> {
if self.value.is_some() {
return self.value.take();
}
let result = self.iter.as_mut().and_then(Iterator::next);
if result.is_none() {
self.iter = None;
}
result
}
}
impl<'a, V, E, S, I> Future for SinkSender<'a, V, S, I>
where
S: Sink<SinkItem = V, SinkError = E> + 'a,
I: Iterator<Item = V>,
{
type Item = ();
type Error = E;
fn poll(&mut self) -> Poll<(), E> {
while let Some(value) = self.next() {
match self.sink.start_send(value) {
Err(e) => return Err(e), Ok(AsyncSink::NotReady(returned)) => {
self.value = Some(returned);
return Ok(Async::NotReady);
},
Ok(AsyncSink::Ready) => (), }
}
self.sink.poll_complete()
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream;
use futures::sync::mpsc;
use tokio::prelude::*;
use tokio::runtime::current_thread;
use prelude::*;
#[test]
fn stream_extract() {
let mut s = stream::once::<_, ()>(Ok(42));
assert_eq!(StreamExtractor::new(&mut s).wait(), Ok(Some(42)));
assert_eq!(StreamExtractor::new(&mut s).wait(), Ok(None));
}
#[test]
fn sink_sender() {
let (mut sender, receiver) = mpsc::unbounded();
let data = vec![1, 2, 3];
{
let sender_fut = SinkSender::new(&mut sender, data.clone());
sender_fut.wait().unwrap();
}
drop(sender); let received = receiver.wait().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(data, received);
}
#[test]
fn async_sink_sender() {
current_thread::block_on_all(future::lazy(|| {
let (mut sender, receiver) = mpsc::channel(1);
let sending_fut = Coroutine::with_defaults(move || {
let data = vec![1, 2, 3];
Coroutine::wait(SinkSender::new(&mut sender, data))
.unwrap()
.unwrap();
});
let receiving_fut = Coroutine::with_defaults(move || {
let mut result = Vec::new();
Coroutine::wait(receiver.for_each(|val| {
result.push(val);
Ok(())
}))
.unwrap()
.unwrap();
assert_eq!(vec![1, 2, 3], result);
});
receiving_fut.join(sending_fut)
})).unwrap();
}
}