use super::Error;
use futures::{try_ready, Async, Future, Poll, Stream};
use tower_service::Service;
#[derive(Debug)]
pub(crate) struct CallAll<Svc, S, Q> {
service: Svc,
stream: S,
queue: Q,
eof: bool,
}
pub(crate) trait Drive<T: Future> {
fn is_empty(&self) -> bool;
fn push(&mut self, future: T);
fn poll(&mut self) -> Poll<Option<T::Item>, T::Error>;
}
impl<Svc, S, Q> CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<Error>,
S: Stream,
S::Error: Into<Error>,
Q: Drive<Svc::Future>,
{
pub(crate) fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
CallAll {
service,
stream,
queue,
eof: false,
}
}
pub(crate) fn into_inner(self) -> Svc {
self.service
}
pub(crate) fn unordered(self) -> super::CallAllUnordered<Svc, S> {
assert!(self.queue.is_empty() && !self.eof);
super::CallAllUnordered::new(self.service, self.stream)
}
}
impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
Svc::Error: Into<Error>,
S: Stream,
S::Error: Into<Error>,
Q: Drive<Svc::Future>,
{
type Item = Svc::Response;
type Error = Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
let res = self.queue.poll().map_err(Into::into);
if let Async::Ready(Some(rsp)) = res? {
return Ok(Async::Ready(Some(rsp)));
}
if self.eof {
if self.queue.is_empty() {
return Ok(Async::Ready(None));
} else {
return Ok(Async::NotReady);
}
}
try_ready!(self.service.poll_ready().map_err(Into::into));
match self.stream.poll().map_err(Into::into)? {
Async::Ready(Some(req)) => {
self.queue.push(self.service.call(req));
}
Async::Ready(None) => {
self.eof = true;
}
Async::NotReady => {
}
}
}
}
}