webdav_handler/
async_stream.rs

1//! Use an [async block][async] to produce items for a stream.
2//!
3//! Example:
4//!
5//! ```rust ignore
6//! use futures::StreamExt;
7//! use futures::executor::block_on;
8//! # use webdav_handler::async_stream;
9//! use async_stream::AsyncStream;
10//!
11//! let mut strm = AsyncStream::<u8, std::io::Error>::new(|mut tx| async move {
12//!     for i in 0u8..10 {
13//!         tx.send(i).await;
14//!     }
15//!     Ok(())
16//! });
17//!
18//! let fut = async {
19//!     let mut count = 0;
20//!     while let Some(item) = strm.next().await {
21//!         println!("{:?}", item);
22//!         count += 1;
23//!     }
24//!     assert!(count == 10);
25//! };
26//! block_on(fut);
27//!
28//! ```
29//!
30//! The stream will produce a `Result<Item, Error>` where the `Item`
31//! is an item sent with [tx.send(item)][send]. Any errors returned by
32//! the async closure will be returned as an error value on
33//! the stream.
34//!
35//! On success the async closure should return `Ok(())`.
36//!
37//! [async]: https://rust-lang.github.io/async-book/getting_started/async_await_primer.html
38//! [send]: async_stream/struct.Sender.html#method.send
39//!
40use std::cell::Cell;
41use std::future::Future;
42use std::marker::PhantomData;
43use std::pin::Pin;
44use std::rc::Rc;
45use std::task::{Context, Poll};
46
47use futures::Stream;
48
49/// Future returned by the Sender.send() method.
50///
51/// Completes when the item is sent.
52#[must_use]
53pub struct SenderFuture {
54    is_ready: bool,
55}
56
57impl SenderFuture {
58    fn new() -> SenderFuture {
59        SenderFuture { is_ready: false }
60    }
61}
62
63impl Future for SenderFuture {
64    type Output = ();
65
66    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
67        if self.is_ready {
68            Poll::Ready(())
69        } else {
70            self.is_ready = true;
71            Poll::Pending
72        }
73    }
74}
75
76// Only internally used by one AsyncStream and never shared
77// in any other way, so we don't have to use Arc<Mutex<..>>.
78/// Type of the sender passed as first argument into the async closure.
79pub struct Sender<I, E>(Rc<Cell<Option<I>>>, PhantomData<E>);
80unsafe impl<I: Sync, E: Sync> Sync for Sender<I, E> {}
81unsafe impl<I: Send, E: Send> Send for Sender<I, E> {}
82
83impl<I, E> Sender<I, E> {
84    fn new(item_opt: Option<I>) -> Sender<I, E> {
85        Sender(Rc::new(Cell::new(item_opt)), PhantomData::<E>)
86    }
87
88    // note that this is NOT impl Clone for Sender, it's private.
89    fn clone(&self) -> Sender<I, E> {
90        Sender(self.0.clone(), PhantomData::<E>)
91    }
92
93    /// Send one item to the stream.
94    pub fn send<T>(&mut self, item: T) -> SenderFuture
95    where T: Into<I> {
96        self.0.set(Some(item.into()));
97        SenderFuture::new()
98    }
99}
100
101/// An abstraction around a future, where the
102/// future can internally loop and yield items.
103///
104/// AsyncStream::new() takes a [Future][Future] ([async closure][async], usually)
105/// and AsyncStream then implements a [futures 0.3 Stream][Stream].
106///
107/// [async]: https://rust-lang.github.io/async-book/getting_started/async_await_primer.html
108/// [Future]: https://doc.rust-lang.org/std/future/trait.Future.html
109/// [Stream]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
110#[must_use]
111pub struct AsyncStream<Item, Error> {
112    item: Sender<Item, Error>,
113    fut:  Option<Pin<Box<dyn Future<Output = Result<(), Error>> + 'static + Send>>>,
114}
115
116impl<Item, Error: 'static + Send> AsyncStream<Item, Error> {
117    /// Create a new stream from a closure returning a Future 0.3,
118    /// or an "async closure" (which is the same).
119    ///
120    /// The closure is passed one argument, the sender, which has a
121    /// method "send" that can be called to send a item to the stream.
122    pub fn new<F, R>(f: F) -> Self
123    where
124        F: FnOnce(Sender<Item, Error>) -> R,
125        R: Future<Output = Result<(), Error>> + Send + 'static,
126        Item: 'static,
127    {
128        let sender = Sender::new(None);
129        AsyncStream::<Item, Error> {
130            item: sender.clone(),
131            fut:  Some(Box::pin(f(sender))),
132        }
133    }
134}
135
136/// Stream implementation for Futures 0.3.
137impl<I, E: Unpin> Stream for AsyncStream<I, E> {
138    type Item = Result<I, E>;
139
140    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<I, E>>> {
141        let pollres = {
142            let fut = self.fut.as_mut().unwrap();
143            fut.as_mut().poll(cx)
144        };
145        match pollres {
146            // If the future returned Poll::Ready, that signals the end of the stream.
147            Poll::Ready(Ok(_)) => Poll::Ready(None),
148            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
149            Poll::Pending => {
150                // Pending means that some sub-future returned pending. That sub-future
151                // _might_ have been the SenderFuture returned by Sender.send, so
152                // check if there is an item available in self.item.
153                let mut item = self.item.0.replace(None);
154                if item.is_none() {
155                    Poll::Pending
156                } else {
157                    Poll::Ready(Some(Ok(item.take().unwrap())))
158                }
159            },
160        }
161    }
162}