webdav_handler/async_stream.rs
1//! Use an [async block][async] to produce items for a stream.
2//!
3//! Example:
4//!
5//! ```rust ignore
6//! use futures::StreamExt;
7//! use futures::executor::block_on;
8//! # use webdav_handler::async_stream;
9//! use async_stream::AsyncStream;
10//!
11//! let mut strm = AsyncStream::<u8, std::io::Error>::new(|mut tx| async move {
12//! for i in 0u8..10 {
13//! tx.send(i).await;
14//! }
15//! Ok(())
16//! });
17//!
18//! let fut = async {
19//! let mut count = 0;
20//! while let Some(item) = strm.next().await {
21//! println!("{:?}", item);
22//! count += 1;
23//! }
24//! assert!(count == 10);
25//! };
26//! block_on(fut);
27//!
28//! ```
29//!
30//! The stream will produce a `Result<Item, Error>` where the `Item`
31//! is an item sent with [tx.send(item)][send]. Any errors returned by
32//! the async closure will be returned as an error value on
33//! the stream.
34//!
35//! On success the async closure should return `Ok(())`.
36//!
37//! [async]: https://rust-lang.github.io/async-book/getting_started/async_await_primer.html
38//! [send]: async_stream/struct.Sender.html#method.send
39//!
40use std::cell::Cell;
41use std::future::Future;
42use std::marker::PhantomData;
43use std::pin::Pin;
44use std::rc::Rc;
45use std::task::{Context, Poll};
46
47use futures::Stream;
48
49/// Future returned by the Sender.send() method.
50///
51/// Completes when the item is sent.
52#[must_use]
53pub struct SenderFuture {
54 is_ready: bool,
55}
56
57impl SenderFuture {
58 fn new() -> SenderFuture {
59 SenderFuture { is_ready: false }
60 }
61}
62
63impl Future for SenderFuture {
64 type Output = ();
65
66 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
67 if self.is_ready {
68 Poll::Ready(())
69 } else {
70 self.is_ready = true;
71 Poll::Pending
72 }
73 }
74}
75
76// Only internally used by one AsyncStream and never shared
77// in any other way, so we don't have to use Arc<Mutex<..>>.
78/// Type of the sender passed as first argument into the async closure.
79pub struct Sender<I, E>(Rc<Cell<Option<I>>>, PhantomData<E>);
80unsafe impl<I: Sync, E: Sync> Sync for Sender<I, E> {}
81unsafe impl<I: Send, E: Send> Send for Sender<I, E> {}
82
83impl<I, E> Sender<I, E> {
84 fn new(item_opt: Option<I>) -> Sender<I, E> {
85 Sender(Rc::new(Cell::new(item_opt)), PhantomData::<E>)
86 }
87
88 // note that this is NOT impl Clone for Sender, it's private.
89 fn clone(&self) -> Sender<I, E> {
90 Sender(self.0.clone(), PhantomData::<E>)
91 }
92
93 /// Send one item to the stream.
94 pub fn send<T>(&mut self, item: T) -> SenderFuture
95 where T: Into<I> {
96 self.0.set(Some(item.into()));
97 SenderFuture::new()
98 }
99}
100
101/// An abstraction around a future, where the
102/// future can internally loop and yield items.
103///
104/// AsyncStream::new() takes a [Future][Future] ([async closure][async], usually)
105/// and AsyncStream then implements a [futures 0.3 Stream][Stream].
106///
107/// [async]: https://rust-lang.github.io/async-book/getting_started/async_await_primer.html
108/// [Future]: https://doc.rust-lang.org/std/future/trait.Future.html
109/// [Stream]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
110#[must_use]
111pub struct AsyncStream<Item, Error> {
112 item: Sender<Item, Error>,
113 fut: Option<Pin<Box<dyn Future<Output = Result<(), Error>> + 'static + Send>>>,
114}
115
116impl<Item, Error: 'static + Send> AsyncStream<Item, Error> {
117 /// Create a new stream from a closure returning a Future 0.3,
118 /// or an "async closure" (which is the same).
119 ///
120 /// The closure is passed one argument, the sender, which has a
121 /// method "send" that can be called to send a item to the stream.
122 pub fn new<F, R>(f: F) -> Self
123 where
124 F: FnOnce(Sender<Item, Error>) -> R,
125 R: Future<Output = Result<(), Error>> + Send + 'static,
126 Item: 'static,
127 {
128 let sender = Sender::new(None);
129 AsyncStream::<Item, Error> {
130 item: sender.clone(),
131 fut: Some(Box::pin(f(sender))),
132 }
133 }
134}
135
136/// Stream implementation for Futures 0.3.
137impl<I, E: Unpin> Stream for AsyncStream<I, E> {
138 type Item = Result<I, E>;
139
140 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<I, E>>> {
141 let pollres = {
142 let fut = self.fut.as_mut().unwrap();
143 fut.as_mut().poll(cx)
144 };
145 match pollres {
146 // If the future returned Poll::Ready, that signals the end of the stream.
147 Poll::Ready(Ok(_)) => Poll::Ready(None),
148 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
149 Poll::Pending => {
150 // Pending means that some sub-future returned pending. That sub-future
151 // _might_ have been the SenderFuture returned by Sender.send, so
152 // check if there is an item available in self.item.
153 let mut item = self.item.0.replace(None);
154 if item.is_none() {
155 Poll::Pending
156 } else {
157 Poll::Ready(Some(Ok(item.take().unwrap())))
158 }
159 },
160 }
161 }
162}