rocket_community/response/stream/
reader.rs

1use std::pin::Pin;
2use std::task::{Context, Poll};
3use std::{fmt, io};
4
5use futures::stream::Stream;
6use pin_project_lite::pin_project;
7use tokio::io::{AsyncRead, ReadBuf};
8
9use crate::request::Request;
10use crate::response::stream::One;
11use crate::response::{self, Responder, Response};
12
13pin_project! {
14    /// An async reader that reads from a stream of async readers.
15    ///
16    /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
17    /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
18    /// [`ReaderStream::one()`]. The `AsyncRead` implementation of
19    /// `ReaderStream` progresses the stream forward, returning the contents of
20    /// the inner readers. Thus, a `ReaderStream` can be thought of as a
21    /// _flattening_ of async readers.
22    ///
23    /// `ReaderStream` is designed to be used as a building-block for
24    /// stream-based responders by acting as the `streamed_body` of a
25    /// `Response`, though it may also be used as a responder itself.
26    ///
27    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
28    ///
29    /// ```rust
30    /// # extern crate rocket_community as rocket;
31    /// use std::io::Cursor;
32    ///
33    /// use rocket::{Request, Response};
34    /// use rocket::futures::stream::{Stream, StreamExt};
35    /// use rocket::response::{self, Responder, stream::ReaderStream};
36    /// use rocket::http::ContentType;
37    ///
38    /// struct MyStream<S>(S);
39    ///
40    /// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
41    ///     where S: Send + 'r
42    /// {
43    ///     fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
44    ///         Response::build()
45    ///             .header(ContentType::Text)
46    ///             .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
47    ///             .ok()
48    ///     }
49    /// }
50    /// ```
51    ///
52    /// # Responder
53    ///
54    /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
55    /// is set. The body is [unsized](crate::response::Body#unsized), and values
56    /// are sent as soon as they are yielded by the internal stream.
57    ///
58    /// # Example
59    ///
60    /// ```rust
61    /// # extern crate rocket_community as rocket;
62    /// # use rocket::*;
63    /// use rocket::response::stream::ReaderStream;
64    /// use rocket::futures::stream::{repeat, StreamExt};
65    /// use rocket::tokio::time::{self, Duration};
66    /// use rocket::tokio::fs::File;
67    ///
68    /// // Stream the contents of `safe/path` followed by `another/safe/path`.
69    /// #[get("/reader/stream")]
70    /// fn stream() -> ReaderStream![File] {
71    ///     ReaderStream! {
72    ///         let paths = &["safe/path", "another/safe/path"];
73    ///         for path in paths {
74    ///             if let Ok(file) = File::open(path).await {
75    ///                 yield file;
76    ///             }
77    ///         }
78    ///     }
79    /// }
80    ///
81    /// // Stream the contents of the file `safe/path`. This is identical to
82    /// // returning `File` directly; Rocket responders stream and never buffer.
83    /// #[get("/reader/stream/one")]
84    /// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
85    ///     let file = File::open("safe/path").await?;
86    ///     Ok(ReaderStream::one(file))
87    /// }
88    /// ```
89    ///
90    /// The syntax of [`ReaderStream!`] as an expression is identical to that of
91    /// [`stream!`](crate::response::stream::stream).
92    pub struct ReaderStream<S: Stream> {
93        #[pin]
94        stream: S,
95        #[pin]
96        state: State<S::Item>,
97    }
98}
99
100pin_project! {
101    #[project = StateProjection]
102    #[derive(Debug)]
103    enum State<R> {
104        Pending,
105        Reading { #[pin] reader: R },
106        Done,
107    }
108}
109
110impl<R: Unpin> ReaderStream<One<R>> {
111    /// Create a `ReaderStream` that yields exactly one reader, streaming the
112    /// contents of the reader itself.
113    ///
114    /// # Example
115    ///
116    /// Stream the bytes from a remote TCP connection:
117    ///
118    /// ```rust
119    /// # extern crate rocket_community as rocket;
120    /// # use rocket::*;
121    /// use std::io;
122    /// use std::net::SocketAddr;
123    ///
124    /// use rocket::tokio::net::TcpStream;
125    /// use rocket::response::stream::ReaderStream;
126    ///
127    /// #[get("/stream")]
128    /// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
129    ///     let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
130    ///     let stream = TcpStream::connect(addr).await?;
131    ///     Ok(ReaderStream::one(stream))
132    /// }
133    /// ```
134    pub fn one(reader: R) -> Self {
135        ReaderStream::from(One::from(reader))
136    }
137}
138
139impl<S: Stream> From<S> for ReaderStream<S> {
140    fn from(stream: S) -> Self {
141        ReaderStream {
142            stream,
143            state: State::Pending,
144        }
145    }
146}
147
148impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
149where
150    S: Send + 'r,
151    S::Item: AsyncRead + Send,
152{
153    fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
154        Response::build().streamed_body(self).ok()
155    }
156}
157
158impl<S: Stream> AsyncRead for ReaderStream<S>
159where
160    S::Item: AsyncRead + Send,
161{
162    fn poll_read(
163        self: Pin<&mut Self>,
164        cx: &mut Context<'_>,
165        buf: &mut ReadBuf<'_>,
166    ) -> Poll<io::Result<()>> {
167        let mut me = self.project();
168        loop {
169            match me.state.as_mut().project() {
170                StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
171                    Poll::Pending => return Poll::Pending,
172                    Poll::Ready(None) => me.state.set(State::Done),
173                    Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
174                },
175                StateProjection::Reading { reader } => {
176                    let init = buf.filled().len();
177                    match reader.poll_read(cx, buf) {
178                        Poll::Ready(Ok(())) if buf.filled().len() == init => {
179                            me.state.set(State::Pending);
180                        }
181                        Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
182                        Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
183                        Poll::Pending => return Poll::Pending,
184                    }
185                }
186                StateProjection::Done => return Poll::Ready(Ok(())),
187            }
188        }
189    }
190}
191
192impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
193where
194    S::Item: fmt::Debug,
195{
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        f.debug_struct("ReaderStream")
198            .field("stream", &self.stream)
199            .field("state", &self.state)
200            .finish()
201    }
202}
203
204crate::export! {
205    /// Type and stream expression macro for [`struct@ReaderStream`].
206    ///
207    /// See [`stream!`](crate::response::stream::stream) for the syntax
208    /// supported by this macro.
209    ///
210    /// See [`struct@ReaderStream`] and the [module level
211    /// docs](crate::response::stream#typed-streams) for usage details.
212    macro_rules! ReaderStream {
213        ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
214    }
215}