use core::pin::Pin;
use futures::stream::{Fuse, FuturesUnordered, StreamExt};
use futures::task::{Context, Poll};
use futures::{Future, Stream};
use pin_project::pin_project;
impl<T: ?Sized> StreamExtBufferUnorderedWithBreaker for T where T: StreamExt {}
pub trait StreamExtBufferUnorderedWithBreaker: StreamExt {
fn buffered_unordered_with_breaker(
self,
n: usize,
breaker: Box<dyn Fn(&<Self::Item as Future>::Output) -> bool + Send>,
) -> BufferUnorderedWithBreaker<Self>
where
Self: Sized,
Self::Item: Future,
{
BufferUnorderedWithBreaker::new(self, n, breaker)
}
}
#[pin_project(project = BufferUnorderedWithBreakerProj)]
#[must_use = "streams do nothing unless polled"]
pub struct BufferUnorderedWithBreaker<St>
where
St: Stream,
St::Item: Future,
{
#[pin]
stream: Fuse<St>,
in_progress_queue: FuturesUnordered<St::Item>,
max: usize,
breaker: Box<dyn Fn(&<St::Item as Future>::Output) -> bool + Send>,
abort: bool,
}
impl<St> BufferUnorderedWithBreaker<St>
where
St: Stream,
St::Item: Future,
{
pub(crate) fn new(
stream: St,
n: usize,
breaker: Box<dyn Fn(&<St::Item as Future>::Output) -> bool + Send>,
) -> BufferUnorderedWithBreaker<St>
where
St: Stream,
St::Item: Future,
{
BufferUnorderedWithBreaker {
stream: stream.fuse(),
in_progress_queue: FuturesUnordered::new(),
max: n,
breaker,
abort: false,
}
}
}
impl<St> Stream for BufferUnorderedWithBreaker<St>
where
St: Stream,
St::Item: Future,
{
type Item = <St::Item as Future>::Output;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let BufferUnorderedWithBreakerProj {
mut stream,
in_progress_queue,
max,
breaker,
abort,
} = self.project();
if *abort {
return Poll::Ready(None);
}
while in_progress_queue.len() < *max {
match stream.as_mut().poll_next(cx) {
Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
match in_progress_queue.poll_next_unpin(cx) {
x @ Poll::Pending => return x,
Poll::Ready(Some(item)) if breaker(&item) => {
*abort = true;
return Poll::Ready(Some(item));
}
x @ Poll::Ready(Some(_)) => return x,
Poll::Ready(None) => {}
}
if stream.is_done() {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}