stakker_async_await/
futures_core.rs

1use crate::{spawn_future, spawn_future_with_waker, ActorFail};
2use futures_core::Stream;
3use stakker::task::Task;
4use stakker::{fwd, fwd_do, ret_nop, Core, Deferrer, Fwd};
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::future::Future;
8use std::pin::Pin;
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12/// Spawn a `Stream`, forwarding values to a `Fwd`
13///
14/// This call does not support the `std::task::Waker` mechanism.  See
15/// [`spawn`] for details.
16///
17/// [`spawn`]: fn.spawn.html
18pub fn spawn_stream<T>(
19    core: &mut Core,
20    stream: impl Stream<Item = T> + 'static,
21    fwd: Fwd<Option<T>>,
22) {
23    spawn_future(
24        core,
25        StreamToFwd {
26            stream: Box::pin(stream),
27            fwd,
28        },
29        ret_nop!(),
30    );
31}
32
33/// Spawn a `Stream`, forwarding values to a `Fwd`, with `Waker` support.
34///
35/// This call does not support the `std::task::Waker` mechanism.  See
36/// [`spawn_with_waker`] for details.
37///
38/// [`spawn_with_waker`]: fn.spawn_with_waker.html
39pub fn spawn_stream_with_waker<T>(
40    core: &mut Core,
41    stream: impl Stream<Item = T> + 'static,
42    fwd: Fwd<Option<T>>,
43) {
44    spawn_future_with_waker(
45        core,
46        StreamToFwd {
47            stream: Box::pin(stream),
48            fwd,
49        },
50        ret_nop!(),
51    );
52}
53
54pub struct StreamToFwd<T: 'static> {
55    stream: Pin<Box<dyn Stream<Item = T> + 'static>>,
56    fwd: Fwd<Option<T>>,
57}
58
59impl<T> Future for StreamToFwd<T> {
60    type Output = ();
61
62    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
63        loop {
64            return match self.stream.as_mut().poll_next(cx) {
65                Poll::Ready(Some(v)) => {
66                    fwd!([self.fwd], Some(v));
67                    continue;
68                }
69                Poll::Ready(None) => {
70                    fwd!([self.fwd], None);
71                    Poll::Ready(())
72                }
73                Poll::Pending => Poll::Pending,
74            };
75        }
76    }
77}
78
79#[macro_use]
80macro_rules! resume {
81    ($deferrer:expr, $task:ident) => {
82        $deferrer.defer(move |s| $task.resume(s))
83    };
84}
85
86/// Create a `Fwd` &rarr; `Stream` pipe which drops the task on failure
87///
88/// The values sent to the returned `Fwd` should be any number of
89/// `Some(value)`, then finally a `None` to terminate the stream.  If
90/// the stream is not terminated with a `None` before the `Fwd` is
91/// dropped then that counts as irregular termination and the
92/// async/await task will be dropped.  This is because this is what
93/// would happen if the actor unexpectedly failed.  So in normal
94/// operation you must make sure that the stream is terminated.
95///
96/// There are three ways to operate this pipe:
97///
98/// - Send all the data immediately to the returned `Fwd`.  It will be
99/// queued until the stream owner is ready to handle it.  `more` can
100/// be passed as `fwd_nop!()` as it is not required.
101///
102/// - For each message received on `more`, send a batch of items
103/// through the returned `Fwd`.  When the available data is finished,
104/// send a `None`.  This lets both ends operate efficiently.
105///
106/// - For each message received on `more`, send a single item through
107/// the returned `Fwd`.  If there is no more data, send a `None`.
108/// This means that data is only sent when the async/await task
109/// requests it.
110///
111/// The `init_capacity` is the initial capacity of the queue.  If you
112/// know that you're going to be sending through batches of a certain
113/// size, then pass that size here.  Otherwise the queue will expand
114/// as necessary, so any sensible value will be okay.
115pub fn fwd_to_stream<T>(
116    core: &mut Core,
117    more: Fwd<()>,
118    init_capacity: usize,
119) -> (impl Stream<Item = T>, Fwd<Option<T>>) {
120    let spq = Rc::new(RefCell::new(StreamQueue::new(
121        core.deferrer(),
122        more,
123        init_capacity,
124    )));
125
126    let stream = StreamPipe { spq: spq.clone() };
127    let guard = StreamGuardDrop(spq);
128    let fwd = fwd_do!(move |m| {
129        let mut spq = guard.0.borrow_mut();
130        if !spq.end {
131            if let Some(m) = m {
132                spq.queue.push_back(m);
133            } else {
134                spq.end = true;
135            }
136            if let Some(mut task) = spq.task.take() {
137                resume!(spq.deferrer, task);
138            }
139        }
140    });
141
142    (stream, fwd)
143}
144
145/// Create a `Fwd` &rarr; `Stream` pipe which passes through failure
146///
147/// The values sent to the returned `Fwd` should be any number of
148/// `Some(value)`, then finally a `None` to terminate the stream.
149/// These values come out of the stream as `Some(Ok(value))` and
150/// `None`.  If the stream is not terminated with a `None` before the
151/// `Fwd` is dropped then that counts as irregular termination and the
152/// values `Some(Err(ActorFail))` and `None` will come out of the
153/// stream to allow it to handle the failure.  This usually indicates
154/// that an actor unexpectedly failed.  So in normal operation you
155/// must make sure that the stream is terminated.
156///
157/// There are three ways to operate this pipe:
158///
159/// - Send all the data immediately to the returned `Fwd`.  It will be
160/// queued until the stream owner is ready to handle it.  `more` can
161/// be passed as `fwd_nop!()` as it is not required.
162///
163/// - For each message received on `more`, send a batch of items
164/// through the returned `Fwd`.  When the available data is finished,
165/// send a `None`.  This lets both ends operate efficiently.
166///
167/// - For each message received on `more`, send a single item through
168/// the returned `Fwd`.  If there is no more data, send a `None`.
169/// This means that data is only sent when the async/await task
170/// requests it.
171///
172/// The `init_capacity` is the initial capacity of the queue.  If you
173/// know that you're going to be sending through batches of a certain
174/// size, then pass that size here.  Otherwise the queue will expand
175/// as necessary, so any sensible value will be okay.
176pub fn fwd_to_stream_result<T>(
177    core: &mut Core,
178    more: Fwd<()>,
179    init_capacity: usize,
180) -> (impl Stream<Item = Result<T, ActorFail>>, Fwd<Option<T>>) {
181    let spq = Rc::new(RefCell::new(StreamQueue::new(
182        core.deferrer(),
183        more,
184        init_capacity,
185    )));
186
187    let stream = StreamPipe { spq: spq.clone() };
188    let guard = StreamGuardFail(spq);
189    let fwd = fwd_do!(move |m| {
190        let mut spq = guard.0.borrow_mut();
191        if !spq.end {
192            if let Some(m) = m {
193                spq.queue.push_back(Ok(m));
194            } else {
195                spq.end = true;
196            }
197            if let Some(mut task) = spq.task.take() {
198                resume!(spq.deferrer, task);
199            }
200        }
201    });
202
203    (stream, fwd)
204}
205
206struct StreamQueue<T> {
207    deferrer: Deferrer,
208    more: Fwd<()>,
209    queue: VecDeque<T>,
210    end: bool,
211    drop: bool,
212    task: Option<Task>,
213}
214
215impl<T> StreamQueue<T> {
216    fn new(deferrer: Deferrer, more: Fwd<()>, init_capacity: usize) -> Self {
217        Self {
218            deferrer,
219            more,
220            queue: VecDeque::with_capacity(init_capacity),
221            end: false,
222            drop: false,
223            task: None,
224        }
225    }
226}
227
228// Guard that when dropped, drops the task if the stream was not
229// terminated nicely
230struct StreamGuardDrop<T>(Rc<RefCell<StreamQueue<T>>>);
231
232impl<T> Drop for StreamGuardDrop<T> {
233    fn drop(&mut self) {
234        let mut spq = self.0.borrow_mut();
235        if !spq.end {
236            // If not ended nicely, drop the task
237            spq.drop = true;
238            spq.task = None;
239        }
240    }
241}
242
243// Guard that when dropped, adds an error to the stream if it was not
244// terminated nicely
245struct StreamGuardFail<T>(Rc<RefCell<StreamQueue<Result<T, ActorFail>>>>);
246
247impl<T> Drop for StreamGuardFail<T> {
248    fn drop(&mut self) {
249        let mut spq = self.0.borrow_mut();
250        if !spq.end {
251            // If not ended nicely, append an error
252            spq.queue.push_back(Err(ActorFail));
253            spq.end = true;
254            if let Some(mut task) = spq.task.take() {
255                resume!(spq.deferrer, task);
256            }
257        }
258    }
259}
260
261/// `Stream` used to implement `stream` and `stream_result`
262pub struct StreamPipe<T> {
263    spq: Rc<RefCell<StreamQueue<T>>>,
264}
265
266impl<T> Stream for StreamPipe<T> {
267    type Item = T;
268
269    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
270        let mut spq = self.spq.borrow_mut();
271        if spq.drop {
272            spq.task = None;
273            return Poll::Pending;
274        }
275        if let Some(item) = spq.queue.pop_front() {
276            return Poll::Ready(Some(item));
277        }
278        if spq.end {
279            return Poll::Ready(None);
280        }
281        spq.task = crate::current_task(&mut spq.deferrer);
282
283        // Notify sender that we need some more data
284        let more = spq.more.clone();
285        drop(spq);
286        fwd!([more]);
287
288        Poll::Pending
289    }
290}