flo_stream 0.7.1

Pubsub and related streams for Rust futures
Documentation
use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::channel::oneshot;
use futures::task;
use futures::task::{Poll};

use std::pin::*;
use std::sync::*;

///
/// A generator stream is a stream that runs a Future internally that generates multiple results, which are
/// formatted as a stream. The stream is closed when the future terminates.
///
pub struct GeneratorStream<TFuture, TItem> {
    /// The future that is generating items
    future: Option<Pin<Box<TFuture>>>,

    /// The last item generated by the future
    next_item: Arc<Mutex<Option<TItem>>>,

    /// Future that waits for the yield to complete
    yield_complete: Arc<Mutex<Option<oneshot::Sender<()>>>>
}

impl<'a, TFuture, TItem> GeneratorStream<TFuture, TItem> 
where 
TFuture:    'a+Future<Output=()>,
TItem:      'a+Send {
    ///
    /// Creates a new generator stream
    ///
    /// The function passed in receives the 'yield' function as a parameter
    ///
    pub fn generate<TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> GeneratorStream<TFuture, TItem> {
        // Create the future status items (next item to return from the stream and the 'yield complete' message)
        let next_item           = Arc::new(Mutex::new(None));
        let yield_complete      = Arc::new(Mutex::new(None));

        // Generator function sets these values
        let generator_item      = Arc::clone(&next_item);
        let generator_yield     = Arc::clone(&yield_complete);
        let generator_fn        = move |item| {
            let (yield_send, yield_recv)        = oneshot::channel();
            (*generator_yield.lock().unwrap())  = Some(yield_send);
            (*generator_item.lock().unwrap())   = Some(item);

            yield_recv.map(|_| ()).boxed()
        };

        // Start the future
        let generator_future    = start_future(Box::new(generator_fn));

        // Result is a new generator stream
        GeneratorStream {
            future:         Some(Box::pin(generator_future)),
            next_item:      next_item,
            yield_complete: yield_complete
        }
    }

    ///
    /// If an item is waiting on this stream, move the future back to the 'running' state and returns the item
    ///
    fn try_yield(&self) -> Option<TItem> {
        let mut next_item       = self.next_item.lock().unwrap();
        let mut yield_complete  = self.yield_complete.lock().unwrap();

        if let Some(next_item) = next_item.take() {
            // Signal to the future that it can continue
            yield_complete.take().map(|yield_complete| yield_complete.send(()));

            // Return the generated item
            Some(next_item)
        } else {
            // No item is waiting
            None
        }
    }
}

///
/// Creates a new generator stream: this is a stream where the items are generated by a future, which can yield them to be returned
/// by the stream via the function that's passed in. This is useful for cases where a stream's values are generated by complicated,
/// stateful behaviour.
///
/// For example, here is a simple generator stream that produces the sequence '0, 1, 2':
///
/// ```
/// # use flo_stream::*;
/// let mut generated_stream = generator_stream(|yield_value| async move {
///    for num in 0u32..3 {
///        yield_value(num).await;
///    }
/// });
/// # use futures::prelude::*;
/// # use futures::executor;
/// # executor::block_on(async move {
/// #     assert!(generated_stream.next().await == Some(0));
/// #     assert!(generated_stream.next().await == Some(1));
/// #     assert!(generated_stream.next().await == Some(2));
/// #     assert!(generated_stream.next().await == None);
/// # })
/// ```
///
pub fn generator_stream<'a, TItem, TFuture, TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> impl Stream<Item=TItem> 
where 
TItem:      'a+Send,
TFuture:    'a+Future<Output=()> {
    GeneratorStream::generate(start_future)
}

impl<TFuture, TItem> Stream for GeneratorStream<TFuture, TItem>
where 
TItem:      Send,
TFuture:    Future<Output=()> {
    type Item = TItem;

    fn poll_next(mut self: Pin<&mut Self>, context: &mut task::Context) -> Poll<Option<Self::Item>> {
        // Return the next item if there is one
        if let Some(next_item) = self.try_yield() { return Poll::Ready(Some(next_item)); }

        // Try polling the future
        if let Some(future) = &mut self.future {
            match TFuture::poll(future.as_mut(), context) {
                Poll::Ready(()) => {
                    // Future has completed
                    self.future = None;

                    // Return the last item, if there is one
                    if let Some(next_item) = self.try_yield() { 
                        return Poll::Ready(Some(next_item)); 
                    } else {
                        // Stream has completed
                        return Poll::Ready(None);
                    }
                }

                Poll::Pending => {
                    // Future is waiting
                }
            }
        } else {
            // No item to yield and no future to run: the stream has completed
            return Poll::Ready(None);
        }

        if let Some(next_item) = self.try_yield() { 
            // If the future generated an item, return that
            return Poll::Ready(Some(next_item)); 
        } else {
            // Future is waiting to process more data
            return Poll::Pending;
        }
    }
}