use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::channel::oneshot;
use futures::task;
use futures::task::{Poll};
use std::pin::*;
use std::sync::*;
pub struct GeneratorStream<TFuture, TItem> {
future: Option<Pin<Box<TFuture>>>,
next_item: Arc<Mutex<Option<TItem>>>,
yield_complete: Arc<Mutex<Option<oneshot::Sender<()>>>>
}
impl<'a, TFuture, TItem> GeneratorStream<TFuture, TItem>
where
TFuture: 'a+Future<Output=()>,
TItem: 'a+Send {
pub fn generate<TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> GeneratorStream<TFuture, TItem> {
let next_item = Arc::new(Mutex::new(None));
let yield_complete = Arc::new(Mutex::new(None));
let generator_item = Arc::clone(&next_item);
let generator_yield = Arc::clone(&yield_complete);
let generator_fn = move |item| {
let (yield_send, yield_recv) = oneshot::channel();
(*generator_yield.lock().unwrap()) = Some(yield_send);
(*generator_item.lock().unwrap()) = Some(item);
yield_recv.map(|_| ()).boxed()
};
let generator_future = start_future(Box::new(generator_fn));
GeneratorStream {
future: Some(Box::pin(generator_future)),
next_item: next_item,
yield_complete: yield_complete
}
}
fn try_yield(&self) -> Option<TItem> {
let mut next_item = self.next_item.lock().unwrap();
let mut yield_complete = self.yield_complete.lock().unwrap();
if let Some(next_item) = next_item.take() {
yield_complete.take().map(|yield_complete| yield_complete.send(()));
Some(next_item)
} else {
None
}
}
}
pub fn generator_stream<'a, TItem, TFuture, TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> impl Stream<Item=TItem>
where
TItem: 'a+Send,
TFuture: 'a+Future<Output=()> {
GeneratorStream::generate(start_future)
}
impl<TFuture, TItem> Stream for GeneratorStream<TFuture, TItem>
where
TItem: Send,
TFuture: Future<Output=()> {
type Item = TItem;
fn poll_next(mut self: Pin<&mut Self>, context: &mut task::Context) -> Poll<Option<Self::Item>> {
if let Some(next_item) = self.try_yield() { return Poll::Ready(Some(next_item)); }
if let Some(future) = &mut self.future {
match TFuture::poll(future.as_mut(), context) {
Poll::Ready(()) => {
self.future = None;
if let Some(next_item) = self.try_yield() {
return Poll::Ready(Some(next_item));
} else {
return Poll::Ready(None);
}
}
Poll::Pending => {
}
}
} else {
return Poll::Ready(None);
}
if let Some(next_item) = self.try_yield() {
return Poll::Ready(Some(next_item));
} else {
return Poll::Pending;
}
}
}