1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
use futures::prelude::*;
use futures::future::{BoxFuture};
use futures::channel::oneshot;
use futures::task;
use futures::task::{Poll};

use std::pin::*;
use std::sync::*;

///
/// A generator stream is a stream that runs a Future internally that generates multiple results, which are
/// formatted as a stream. The stream is closed when the future terminates.
///
pub struct GeneratorStream<TFuture, TItem> {
    /// The future that is generating items
    future: Option<Pin<Box<TFuture>>>,

    /// The last item generated by the future
    next_item: Arc<Mutex<Option<TItem>>>,

    /// Future that waits for the yield to complete
    yield_complete: Arc<Mutex<Option<oneshot::Sender<()>>>>
}

impl<'a, TFuture, TItem> GeneratorStream<TFuture, TItem> 
where 
TFuture:    'a+Future<Output=()>,
TItem:      'a+Send {
    ///
    /// Creates a new generator stream
    ///
    /// The function passed in receives the 'yield' function as a parameter
    ///
    pub fn generate<TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> GeneratorStream<TFuture, TItem> {
        // Create the future status items (next item to return from the stream and the 'yield complete' message)
        let next_item           = Arc::new(Mutex::new(None));
        let yield_complete      = Arc::new(Mutex::new(None));

        // Generator function sets these values
        let generator_item      = Arc::clone(&next_item);
        let generator_yield     = Arc::clone(&yield_complete);
        let generator_fn        = move |item| {
            let (yield_send, yield_recv)        = oneshot::channel();
            (*generator_yield.lock().unwrap())  = Some(yield_send);
            (*generator_item.lock().unwrap())   = Some(item);

            yield_recv.map(|_| ()).boxed()
        };

        // Start the future
        let generator_future    = start_future(Box::new(generator_fn));

        // Result is a new generator stream
        GeneratorStream {
            future:         Some(Box::pin(generator_future)),
            next_item:      next_item,
            yield_complete: yield_complete
        }
    }

    ///
    /// If an item is waiting on this stream, move the future back to the 'running' state and returns the item
    ///
    fn try_yield(&self) -> Option<TItem> {
        let mut next_item       = self.next_item.lock().unwrap();
        let mut yield_complete  = self.yield_complete.lock().unwrap();

        if let Some(next_item) = next_item.take() {
            // Signal to the future that it can continue
            yield_complete.take().map(|yield_complete| yield_complete.send(()));

            // Return the generated item
            Some(next_item)
        } else {
            // No item is waiting
            None
        }
    }
}

///
/// Creates a new generator stream: this is a stream where the items are generated by a future, which can yield them to be returned
/// by the stream via the function that's passed in. This is useful for cases where a stream's values are generated by complicated,
/// stateful behaviour.
///
/// For example, here is a simple generator stream that produces the sequence '0, 1, 2':
///
/// ```
/// # use flo_stream::*;
/// let mut generated_stream = generator_stream(|yield_value| async move {
///    for num in 0u32..3 {
///        yield_value(num).await;
///    }
/// });
/// # use futures::prelude::*;
/// # use futures::executor;
/// # executor::block_on(async move {
/// #     assert!(generated_stream.next().await == Some(0));
/// #     assert!(generated_stream.next().await == Some(1));
/// #     assert!(generated_stream.next().await == Some(2));
/// #     assert!(generated_stream.next().await == None);
/// # })
/// ```
///
pub fn generator_stream<'a, TItem, TFuture, TFutureFn: FnOnce(Box<dyn 'a+Send+Sync+Fn(TItem) -> BoxFuture<'static, ()>>) -> TFuture>(start_future: TFutureFn) -> impl Stream<Item=TItem> 
where 
TItem:      'a+Send,
TFuture:    'a+Future<Output=()> {
    GeneratorStream::generate(start_future)
}

impl<TFuture, TItem> Stream for GeneratorStream<TFuture, TItem>
where 
TItem:      Send,
TFuture:    Future<Output=()> {
    type Item = TItem;

    fn poll_next(mut self: Pin<&mut Self>, context: &mut task::Context) -> Poll<Option<Self::Item>> {
        // Return the next item if there is one
        if let Some(next_item) = self.try_yield() { return Poll::Ready(Some(next_item)); }

        // Try polling the future
        if let Some(future) = &mut self.future {
            match TFuture::poll(future.as_mut(), context) {
                Poll::Ready(()) => {
                    // Future has completed
                    self.future = None;

                    // Return the last item, if there is one
                    if let Some(next_item) = self.try_yield() { 
                        return Poll::Ready(Some(next_item)); 
                    } else {
                        // Stream has completed
                        return Poll::Ready(None);
                    }
                }

                Poll::Pending => {
                    // Future is waiting
                }
            }
        } else {
            // No item to yield and no future to run: the stream has completed
            return Poll::Ready(None);
        }

        if let Some(next_item) = self.try_yield() { 
            // If the future generated an item, return that
            return Poll::Ready(Some(next_item)); 
        } else {
            // Future is waiting to process more data
            return Poll::Pending;
        }
    }
}