use std::prelude::v1::*;
use std::fmt;
use std::mem;
use std::sync::Arc;
use task::{self, UnparkEvent};
use {Async, IntoFuture, Poll, Future};
use stream::{Stream, Fuse};
use stack::{Stack, Drain};
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnordered<S>
where S: Stream,
S::Item: IntoFuture,
{
stream: Fuse<S>,
futures: Vec<Slot<<S::Item as IntoFuture>::Future>>,
next_future: usize,
stack: Arc<Stack<usize>>,
pending: Drain<usize>,
active: usize,
}
impl<S> fmt::Debug for BufferUnordered<S>
where S: Stream + fmt::Debug,
S::Item: IntoFuture,
<<S as Stream>::Item as IntoFuture>::Future: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("BufferUnordered")
.field("stream", &self.stream)
.field("futures", &self.futures)
.field("next_future", &self.next_future)
.field("stack", &self.stack)
.field("pending", &self.pending)
.field("active", &self.active)
.finish()
}
}
#[derive(Debug)]
enum Slot<T> {
Next(usize),
Data(T),
}
pub fn new<S>(s: S, amt: usize) -> BufferUnordered<S>
where S: Stream,
S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
BufferUnordered {
stream: super::fuse::new(s),
futures: (0..amt).map(|i| Slot::Next(i + 1)).collect(),
next_future: 0,
pending: Stack::new().drain(),
stack: Arc::new(Stack::new()),
active: 0,
}
}
impl<S> BufferUnordered<S>
where S: Stream,
S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
fn poll_pending(&mut self)
-> Option<Poll<Option<<S::Item as IntoFuture>::Item>,
S::Error>> {
while let Some(idx) = self.pending.next() {
let result = match self.futures[idx] {
Slot::Data(ref mut f) => {
let event = UnparkEvent::new(self.stack.clone(), idx);
match task::with_unpark_event(event, || f.poll()) {
Ok(Async::NotReady) => continue,
Ok(Async::Ready(e)) => Ok(Async::Ready(Some(e))),
Err(e) => Err(e),
}
},
Slot::Next(_) => continue,
};
self.active -= 1;
self.futures[idx] = Slot::Next(self.next_future);
self.next_future = idx;
return Some(result)
}
None
}
}
impl<S> Stream for BufferUnordered<S>
where S: Stream,
S::Item: IntoFuture<Error=<S as Stream>::Error>,
{
type Item = <S::Item as IntoFuture>::Item;
type Error = <S as Stream>::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
while self.next_future < self.futures.len() {
let future = match try!(self.stream.poll()) {
Async::Ready(Some(s)) => s.into_future(),
Async::Ready(None) |
Async::NotReady => break,
};
self.active += 1;
self.stack.push(self.next_future);
match mem::replace(&mut self.futures[self.next_future],
Slot::Data(future)) {
Slot::Next(next) => self.next_future = next,
Slot::Data(_) => panic!(),
}
}
if let Some(ret) = self.poll_pending() {
return ret
}
assert!(self.pending.next().is_none());
self.pending = self.stack.drain();
if let Some(ret) = self.poll_pending() {
return ret
}
Ok(if self.active > 0 || !self.stream.is_done() {
Async::NotReady
} else {
Async::Ready(None)
})
}
}
impl<S> ::sink::Sink for BufferUnordered<S>
where S: ::sink::Sink + Stream,
S::Item: IntoFuture,
{
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;
fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
self.stream.start_send(item)
}
fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
self.stream.poll_complete()
}
fn close(&mut self) -> Poll<(), S::SinkError> {
self.stream.close()
}
}