use futures::{ready, stream::FuturesUnordered, Stream};
use pin_project_lite::pin_project;
use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::Service;
pin_project! {
#[derive(Debug)]
pub(super) struct CallAllUnordered<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
#[pin]
inner: CallAll<Svc, S, FuturesUnordered<Svc::Future>>,
}
}
impl<Svc, S> CallAllUnordered<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
pub(super) fn new(service: Svc, stream: S) -> CallAllUnordered<Svc, S> {
CallAllUnordered {
inner: CallAll::new(service, stream, FuturesUnordered::new()),
}
}
}
impl<Svc, S> Stream for CallAllUnordered<Svc, S>
where
Svc: Service<S::Item>,
S: Stream,
{
type Item = Result<Svc::Response, Svc::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().inner.poll_next(cx)
}
}
impl<F: Future> Drive<F> for FuturesUnordered<F> {
fn is_empty(&self) -> bool {
FuturesUnordered::is_empty(self)
}
fn push(&mut self, future: F) {
FuturesUnordered::push(self, future)
}
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
Stream::poll_next(Pin::new(self), cx)
}
}
pin_project! {
pub(crate) struct CallAll<Svc, S, Q>
where
S: Stream,
{
service: Option<Svc>,
#[pin]
stream: S,
queue: Q,
eof: bool,
curr_req: Option<S::Item>
}
}
impl<Svc, S, Q> fmt::Debug for CallAll<Svc, S, Q>
where
Svc: fmt::Debug,
S: Stream + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CallAll")
.field("service", &self.service)
.field("stream", &self.stream)
.field("eof", &self.eof)
.finish()
}
}
pub(crate) trait Drive<F: Future> {
fn is_empty(&self) -> bool;
fn push(&mut self, future: F);
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Option<F::Output>>;
}
impl<Svc, S, Q> CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
S: Stream,
Q: Drive<Svc::Future>,
{
pub(crate) const fn new(service: Svc, stream: S, queue: Q) -> CallAll<Svc, S, Q> {
CallAll {
service: Some(service),
stream,
queue,
eof: false,
curr_req: None,
}
}
}
impl<Svc, S, Q> Stream for CallAll<Svc, S, Q>
where
Svc: Service<S::Item>,
S: Stream,
Q: Drive<Svc::Future>,
{
type Item = Result<Svc::Response, Svc::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Poll::Ready(r) = this.queue.poll(cx) {
if let Some(rsp) = r.transpose()? {
return Poll::Ready(Some(Ok(rsp)));
}
}
if *this.eof {
if this.queue.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Pending;
}
}
let svc = this
.service
.as_mut()
.expect("Using CallAll after extracting inner Service");
if let Err(e) = ready!(svc.poll_ready(cx)) {
*this.eof = true;
return Poll::Ready(Some(Err(e)));
}
if this.curr_req.is_none() {
*this.curr_req = match ready!(this.stream.as_mut().poll_next(cx)) {
Some(next_req) => Some(next_req),
None => {
*this.eof = true;
continue;
}
};
}
this.queue.push(svc.call(this.curr_req.take().unwrap()));
}
}
}