1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::channel::oneshot;
use futures::task;
use futures::task::{Poll};
use std::pin::*;
use std::sync::*;
///
/// A generator stream is a stream that runs a Future internally that generates multiple results, which are
/// formatted as a stream. The stream is closed when the future terminates.
///
pub struct GeneratorStream<TFuture, TItem> {
/// The future that is generating items
future: Option<Pin<Box<TFuture>>>,
/// The last item generated by the future
next_item: Arc<Mutex<Option<TItem>>>,
/// Future that waits for the yield to complete
yield_complete: Arc<Mutex<Option<oneshot::Sender<()>>>>
}
impl<'a, TFuture, TItem> GeneratorStream<TFuture, TItem>
where
TFuture: 'a+Future<Output=()>,
TItem: 'a+Send {
///
/// Creates a new generator stream
///
/// The function passed in receives the 'yield' function as a parameter
///
pub fn generate<TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> GeneratorStream<TFuture, TItem> {
// Create the future status items (next item to return from the stream and the 'yield complete' message)
let next_item = Arc::new(Mutex::new(None));
let yield_complete = Arc::new(Mutex::new(None));
// Generator function sets these values
let generator_item = Arc::clone(&next_item);
let generator_yield = Arc::clone(&yield_complete);
let generator_fn = move |item| {
let (yield_send, yield_recv) = oneshot::channel();
(*generator_yield.lock().unwrap()) = Some(yield_send);
(*generator_item.lock().unwrap()) = Some(item);
yield_recv.map(|_| ()).boxed()
};
// Start the future
let generator_future = start_future(Box::new(generator_fn));
// Result is a new generator stream
GeneratorStream {
future: Some(Box::pin(generator_future)),
next_item: next_item,
yield_complete: yield_complete
}
}
///
/// If an item is waiting on this stream, move the future back to the 'running' state and returns the item
///
fn try_yield(&self) -> Option<TItem> {
let mut next_item = self.next_item.lock().unwrap();
let mut yield_complete = self.yield_complete.lock().unwrap();
if let Some(next_item) = next_item.take() {
// Signal to the future that it can continue
yield_complete.take().map(|yield_complete| yield_complete.send(()));
// Return the generated item
Some(next_item)
} else {
// No item is waiting
None
}
}
}
///
/// Creates a new generator stream: this is a stream where the items are generated by a future, which can yield them to be returned
/// by the stream via the function that's passed in. This is useful for cases where a stream's values are generated by complicated,
/// stateful behaviour.
///
/// For example, here is a simple generator stream that produces the sequence '0, 1, 2':
///
/// ```
/// # use flo_stream::*;
/// let mut generated_stream = generator_stream(|yield_value| async move {
/// for num in 0u32..3 {
/// yield_value(num).await;
/// }
/// });
/// # use futures::prelude::*;
/// # use futures::executor;
/// # executor::block_on(async move {
/// # assert!(generated_stream.next().await == Some(0));
/// # assert!(generated_stream.next().await == Some(1));
/// # assert!(generated_stream.next().await == Some(2));
/// # assert!(generated_stream.next().await == None);
/// # })
/// ```
///
pub fn generator_stream<'a, TItem, TFuture, TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> impl Stream<Item=TItem>
where
TItem: 'a+Send,
TFuture: 'a+Future<Output=()> {
GeneratorStream::generate(start_future)
}
impl<TFuture, TItem> Stream for GeneratorStream<TFuture, TItem>
where
TItem: Send,
TFuture: Future<Output=()> {
type Item = TItem;
fn poll_next(mut self: Pin<&mut Self>, context: &mut task::Context) -> Poll<Option<Self::Item>> {
// Return the next item if there is one
if let Some(next_item) = self.try_yield() { return Poll::Ready(Some(next_item)); }
// Try polling the future
if let Some(future) = &mut self.future {
match TFuture::poll(future.as_mut(), context) {
Poll::Ready(()) => {
// Future has completed
self.future = None;
// Return the last item, if there is one
if let Some(next_item) = self.try_yield() {
return Poll::Ready(Some(next_item));
} else {
// Stream has completed
return Poll::Ready(None);
}
}
Poll::Pending => {
// Future is waiting
}
}
} else {
// No item to yield and no future to run: the stream has completed
return Poll::Ready(None);
}
if let Some(next_item) = self.try_yield() {
// If the future generated an item, return that
return Poll::Ready(Some(next_item));
} else {
// Future is waiting to process more data
return Poll::Pending;
}
}
}