stakker_async_await/futures_core.rs
1use crate::{spawn_future, spawn_future_with_waker, ActorFail};
2use futures_core::Stream;
3use stakker::task::Task;
4use stakker::{fwd, fwd_do, ret_nop, Core, Deferrer, Fwd};
5use std::cell::RefCell;
6use std::collections::VecDeque;
7use std::future::Future;
8use std::pin::Pin;
9use std::rc::Rc;
10use std::task::{Context, Poll};
11
12/// Spawn a `Stream`, forwarding values to a `Fwd`
13///
14/// This call does not support the `std::task::Waker` mechanism. See
15/// [`spawn`] for details.
16///
17/// [`spawn`]: fn.spawn.html
18pub fn spawn_stream<T>(
19 core: &mut Core,
20 stream: impl Stream<Item = T> + 'static,
21 fwd: Fwd<Option<T>>,
22) {
23 spawn_future(
24 core,
25 StreamToFwd {
26 stream: Box::pin(stream),
27 fwd,
28 },
29 ret_nop!(),
30 );
31}
32
33/// Spawn a `Stream`, forwarding values to a `Fwd`, with `Waker` support.
34///
35/// This call does not support the `std::task::Waker` mechanism. See
36/// [`spawn_with_waker`] for details.
37///
38/// [`spawn_with_waker`]: fn.spawn_with_waker.html
39pub fn spawn_stream_with_waker<T>(
40 core: &mut Core,
41 stream: impl Stream<Item = T> + 'static,
42 fwd: Fwd<Option<T>>,
43) {
44 spawn_future_with_waker(
45 core,
46 StreamToFwd {
47 stream: Box::pin(stream),
48 fwd,
49 },
50 ret_nop!(),
51 );
52}
53
54pub struct StreamToFwd<T: 'static> {
55 stream: Pin<Box<dyn Stream<Item = T> + 'static>>,
56 fwd: Fwd<Option<T>>,
57}
58
59impl<T> Future for StreamToFwd<T> {
60 type Output = ();
61
62 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
63 loop {
64 return match self.stream.as_mut().poll_next(cx) {
65 Poll::Ready(Some(v)) => {
66 fwd!([self.fwd], Some(v));
67 continue;
68 }
69 Poll::Ready(None) => {
70 fwd!([self.fwd], None);
71 Poll::Ready(())
72 }
73 Poll::Pending => Poll::Pending,
74 };
75 }
76 }
77}
78
79#[macro_use]
80macro_rules! resume {
81 ($deferrer:expr, $task:ident) => {
82 $deferrer.defer(move |s| $task.resume(s))
83 };
84}
85
86/// Create a `Fwd` → `Stream` pipe which drops the task on failure
87///
88/// The values sent to the returned `Fwd` should be any number of
89/// `Some(value)`, then finally a `None` to terminate the stream. If
90/// the stream is not terminated with a `None` before the `Fwd` is
91/// dropped then that counts as irregular termination and the
92/// async/await task will be dropped. This is because this is what
93/// would happen if the actor unexpectedly failed. So in normal
94/// operation you must make sure that the stream is terminated.
95///
96/// There are three ways to operate this pipe:
97///
98/// - Send all the data immediately to the returned `Fwd`. It will be
99/// queued until the stream owner is ready to handle it. `more` can
100/// be passed as `fwd_nop!()` as it is not required.
101///
102/// - For each message received on `more`, send a batch of items
103/// through the returned `Fwd`. When the available data is finished,
104/// send a `None`. This lets both ends operate efficiently.
105///
106/// - For each message received on `more`, send a single item through
107/// the returned `Fwd`. If there is no more data, send a `None`.
108/// This means that data is only sent when the async/await task
109/// requests it.
110///
111/// The `init_capacity` is the initial capacity of the queue. If you
112/// know that you're going to be sending through batches of a certain
113/// size, then pass that size here. Otherwise the queue will expand
114/// as necessary, so any sensible value will be okay.
115pub fn fwd_to_stream<T>(
116 core: &mut Core,
117 more: Fwd<()>,
118 init_capacity: usize,
119) -> (impl Stream<Item = T>, Fwd<Option<T>>) {
120 let spq = Rc::new(RefCell::new(StreamQueue::new(
121 core.deferrer(),
122 more,
123 init_capacity,
124 )));
125
126 let stream = StreamPipe { spq: spq.clone() };
127 let guard = StreamGuardDrop(spq);
128 let fwd = fwd_do!(move |m| {
129 let mut spq = guard.0.borrow_mut();
130 if !spq.end {
131 if let Some(m) = m {
132 spq.queue.push_back(m);
133 } else {
134 spq.end = true;
135 }
136 if let Some(mut task) = spq.task.take() {
137 resume!(spq.deferrer, task);
138 }
139 }
140 });
141
142 (stream, fwd)
143}
144
145/// Create a `Fwd` → `Stream` pipe which passes through failure
146///
147/// The values sent to the returned `Fwd` should be any number of
148/// `Some(value)`, then finally a `None` to terminate the stream.
149/// These values come out of the stream as `Some(Ok(value))` and
150/// `None`. If the stream is not terminated with a `None` before the
151/// `Fwd` is dropped then that counts as irregular termination and the
152/// values `Some(Err(ActorFail))` and `None` will come out of the
153/// stream to allow it to handle the failure. This usually indicates
154/// that an actor unexpectedly failed. So in normal operation you
155/// must make sure that the stream is terminated.
156///
157/// There are three ways to operate this pipe:
158///
159/// - Send all the data immediately to the returned `Fwd`. It will be
160/// queued until the stream owner is ready to handle it. `more` can
161/// be passed as `fwd_nop!()` as it is not required.
162///
163/// - For each message received on `more`, send a batch of items
164/// through the returned `Fwd`. When the available data is finished,
165/// send a `None`. This lets both ends operate efficiently.
166///
167/// - For each message received on `more`, send a single item through
168/// the returned `Fwd`. If there is no more data, send a `None`.
169/// This means that data is only sent when the async/await task
170/// requests it.
171///
172/// The `init_capacity` is the initial capacity of the queue. If you
173/// know that you're going to be sending through batches of a certain
174/// size, then pass that size here. Otherwise the queue will expand
175/// as necessary, so any sensible value will be okay.
176pub fn fwd_to_stream_result<T>(
177 core: &mut Core,
178 more: Fwd<()>,
179 init_capacity: usize,
180) -> (impl Stream<Item = Result<T, ActorFail>>, Fwd<Option<T>>) {
181 let spq = Rc::new(RefCell::new(StreamQueue::new(
182 core.deferrer(),
183 more,
184 init_capacity,
185 )));
186
187 let stream = StreamPipe { spq: spq.clone() };
188 let guard = StreamGuardFail(spq);
189 let fwd = fwd_do!(move |m| {
190 let mut spq = guard.0.borrow_mut();
191 if !spq.end {
192 if let Some(m) = m {
193 spq.queue.push_back(Ok(m));
194 } else {
195 spq.end = true;
196 }
197 if let Some(mut task) = spq.task.take() {
198 resume!(spq.deferrer, task);
199 }
200 }
201 });
202
203 (stream, fwd)
204}
205
206struct StreamQueue<T> {
207 deferrer: Deferrer,
208 more: Fwd<()>,
209 queue: VecDeque<T>,
210 end: bool,
211 drop: bool,
212 task: Option<Task>,
213}
214
215impl<T> StreamQueue<T> {
216 fn new(deferrer: Deferrer, more: Fwd<()>, init_capacity: usize) -> Self {
217 Self {
218 deferrer,
219 more,
220 queue: VecDeque::with_capacity(init_capacity),
221 end: false,
222 drop: false,
223 task: None,
224 }
225 }
226}
227
228// Guard that when dropped, drops the task if the stream was not
229// terminated nicely
230struct StreamGuardDrop<T>(Rc<RefCell<StreamQueue<T>>>);
231
232impl<T> Drop for StreamGuardDrop<T> {
233 fn drop(&mut self) {
234 let mut spq = self.0.borrow_mut();
235 if !spq.end {
236 // If not ended nicely, drop the task
237 spq.drop = true;
238 spq.task = None;
239 }
240 }
241}
242
243// Guard that when dropped, adds an error to the stream if it was not
244// terminated nicely
245struct StreamGuardFail<T>(Rc<RefCell<StreamQueue<Result<T, ActorFail>>>>);
246
247impl<T> Drop for StreamGuardFail<T> {
248 fn drop(&mut self) {
249 let mut spq = self.0.borrow_mut();
250 if !spq.end {
251 // If not ended nicely, append an error
252 spq.queue.push_back(Err(ActorFail));
253 spq.end = true;
254 if let Some(mut task) = spq.task.take() {
255 resume!(spq.deferrer, task);
256 }
257 }
258 }
259}
260
261/// `Stream` used to implement `stream` and `stream_result`
262pub struct StreamPipe<T> {
263 spq: Rc<RefCell<StreamQueue<T>>>,
264}
265
266impl<T> Stream for StreamPipe<T> {
267 type Item = T;
268
269 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
270 let mut spq = self.spq.borrow_mut();
271 if spq.drop {
272 spq.task = None;
273 return Poll::Pending;
274 }
275 if let Some(item) = spq.queue.pop_front() {
276 return Poll::Ready(Some(item));
277 }
278 if spq.end {
279 return Poll::Ready(None);
280 }
281 spq.task = crate::current_task(&mut spq.deferrer);
282
283 // Notify sender that we need some more data
284 let more = spq.more.clone();
285 drop(spq);
286 fwd!([more]);
287
288 Poll::Pending
289 }
290}