use crate::futures::stream::Yielder;
use crate::futures::Stream;
use std::future::Future;
use std::pin::Pin;
use std::sync::mpsc::{channel, Receiver};
use std::task::{Context, Poll};
pub fn generate_stream<F, B, T>(builder: B) -> impl Stream<Item = T>
where
B: FnOnce(Yielder<T>) -> F,
F: Future<Output = ()> + Unpin,
{
StreamGenerator::new(builder)
}
struct StreamGenerator<T, B, F> {
builder: Option<B>,
yielder: Option<Yielder<T>>,
receiver: Receiver<T>,
future: Option<F>,
done: bool,
}
impl<T, B, F> Unpin for StreamGenerator<T, B, F> {}
impl<T, B, F> StreamGenerator<T, B, F> {
pub fn new(builder: B) -> Self {
let (sender, receiver) = channel();
let builder = Some(builder);
let yielder = Some(Yielder::new(sender));
StreamGenerator {
builder,
yielder,
receiver,
future: None,
done: false,
}
}
}
impl<T, B, F> Stream for StreamGenerator<T, B, F>
where
B: FnOnce(Yielder<T>) -> F,
F: Future<Output = ()> + Unpin,
{
type Item = T;
#[allow(clippy::never_loop)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let StreamGenerator {
builder,
yielder,
receiver,
future,
done,
} = self.get_mut();
if *done {
while let Ok(data) = receiver.try_recv() {
return Poll::Ready(Some(data));
}
return Poll::Ready(None);
}
let future = {
match future {
Some(f) => f,
None => {
let builder = builder.take().expect("Builder was already called");
let yielder = yielder.take().unwrap();
future.get_or_insert(builder(yielder))
}
}
};
let poll = Pin::new(future).poll(cx);
*done = poll.is_ready();
while let Ok(data) = receiver.try_recv() {
return Poll::Ready(Some(data));
}
match poll {
Poll::Ready(()) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}