rocket_community/response/stream/reader.rs
1use std::pin::Pin;
2use std::task::{Context, Poll};
3use std::{fmt, io};
4
5use futures::stream::Stream;
6use pin_project_lite::pin_project;
7use tokio::io::{AsyncRead, ReadBuf};
8
9use crate::request::Request;
10use crate::response::stream::One;
11use crate::response::{self, Responder, Response};
12
13pin_project! {
14 /// An async reader that reads from a stream of async readers.
15 ///
16 /// A `ReaderStream` can be constructed from any [`Stream`] of items of type
17 /// `T` where `T: AsyncRead`, or from a single `AsyncRead` type using
18 /// [`ReaderStream::one()`]. The `AsyncRead` implementation of
19 /// `ReaderStream` progresses the stream forward, returning the contents of
20 /// the inner readers. Thus, a `ReaderStream` can be thought of as a
21 /// _flattening_ of async readers.
22 ///
23 /// `ReaderStream` is designed to be used as a building-block for
24 /// stream-based responders by acting as the `streamed_body` of a
25 /// `Response`, though it may also be used as a responder itself.
26 ///
27 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
28 ///
29 /// ```rust
30 /// # extern crate rocket_community as rocket;
31 /// use std::io::Cursor;
32 ///
33 /// use rocket::{Request, Response};
34 /// use rocket::futures::stream::{Stream, StreamExt};
35 /// use rocket::response::{self, Responder, stream::ReaderStream};
36 /// use rocket::http::ContentType;
37 ///
38 /// struct MyStream<S>(S);
39 ///
40 /// impl<'r, S: Stream<Item = String>> Responder<'r, 'r> for MyStream<S>
41 /// where S: Send + 'r
42 /// {
43 /// fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
44 /// Response::build()
45 /// .header(ContentType::Text)
46 /// .streamed_body(ReaderStream::from(self.0.map(Cursor::new)))
47 /// .ok()
48 /// }
49 /// }
50 /// ```
51 ///
52 /// # Responder
53 ///
54 /// `ReaderStream` is a (potentially infinite) responder. No `Content-Type`
55 /// is set. The body is [unsized](crate::response::Body#unsized), and values
56 /// are sent as soon as they are yielded by the internal stream.
57 ///
58 /// # Example
59 ///
60 /// ```rust
61 /// # extern crate rocket_community as rocket;
62 /// # use rocket::*;
63 /// use rocket::response::stream::ReaderStream;
64 /// use rocket::futures::stream::{repeat, StreamExt};
65 /// use rocket::tokio::time::{self, Duration};
66 /// use rocket::tokio::fs::File;
67 ///
68 /// // Stream the contents of `safe/path` followed by `another/safe/path`.
69 /// #[get("/reader/stream")]
70 /// fn stream() -> ReaderStream![File] {
71 /// ReaderStream! {
72 /// let paths = &["safe/path", "another/safe/path"];
73 /// for path in paths {
74 /// if let Ok(file) = File::open(path).await {
75 /// yield file;
76 /// }
77 /// }
78 /// }
79 /// }
80 ///
81 /// // Stream the contents of the file `safe/path`. This is identical to
82 /// // returning `File` directly; Rocket responders stream and never buffer.
83 /// #[get("/reader/stream/one")]
84 /// async fn stream_one() -> std::io::Result<ReaderStream![File]> {
85 /// let file = File::open("safe/path").await?;
86 /// Ok(ReaderStream::one(file))
87 /// }
88 /// ```
89 ///
90 /// The syntax of [`ReaderStream!`] as an expression is identical to that of
91 /// [`stream!`](crate::response::stream::stream).
92 pub struct ReaderStream<S: Stream> {
93 #[pin]
94 stream: S,
95 #[pin]
96 state: State<S::Item>,
97 }
98}
99
100pin_project! {
101 #[project = StateProjection]
102 #[derive(Debug)]
103 enum State<R> {
104 Pending,
105 Reading { #[pin] reader: R },
106 Done,
107 }
108}
109
110impl<R: Unpin> ReaderStream<One<R>> {
111 /// Create a `ReaderStream` that yields exactly one reader, streaming the
112 /// contents of the reader itself.
113 ///
114 /// # Example
115 ///
116 /// Stream the bytes from a remote TCP connection:
117 ///
118 /// ```rust
119 /// # extern crate rocket_community as rocket;
120 /// # use rocket::*;
121 /// use std::io;
122 /// use std::net::SocketAddr;
123 ///
124 /// use rocket::tokio::net::TcpStream;
125 /// use rocket::response::stream::ReaderStream;
126 ///
127 /// #[get("/stream")]
128 /// async fn stream() -> io::Result<ReaderStream![TcpStream]> {
129 /// let addr = SocketAddr::from(([127, 0, 0, 1], 9999));
130 /// let stream = TcpStream::connect(addr).await?;
131 /// Ok(ReaderStream::one(stream))
132 /// }
133 /// ```
134 pub fn one(reader: R) -> Self {
135 ReaderStream::from(One::from(reader))
136 }
137}
138
139impl<S: Stream> From<S> for ReaderStream<S> {
140 fn from(stream: S) -> Self {
141 ReaderStream {
142 stream,
143 state: State::Pending,
144 }
145 }
146}
147
148impl<'r, S: Stream> Responder<'r, 'r> for ReaderStream<S>
149where
150 S: Send + 'r,
151 S::Item: AsyncRead + Send,
152{
153 fn respond_to(self, _: &'r Request<'_>) -> response::Result<'r> {
154 Response::build().streamed_body(self).ok()
155 }
156}
157
158impl<S: Stream> AsyncRead for ReaderStream<S>
159where
160 S::Item: AsyncRead + Send,
161{
162 fn poll_read(
163 self: Pin<&mut Self>,
164 cx: &mut Context<'_>,
165 buf: &mut ReadBuf<'_>,
166 ) -> Poll<io::Result<()>> {
167 let mut me = self.project();
168 loop {
169 match me.state.as_mut().project() {
170 StateProjection::Pending => match me.stream.as_mut().poll_next(cx) {
171 Poll::Pending => return Poll::Pending,
172 Poll::Ready(None) => me.state.set(State::Done),
173 Poll::Ready(Some(reader)) => me.state.set(State::Reading { reader }),
174 },
175 StateProjection::Reading { reader } => {
176 let init = buf.filled().len();
177 match reader.poll_read(cx, buf) {
178 Poll::Ready(Ok(())) if buf.filled().len() == init => {
179 me.state.set(State::Pending);
180 }
181 Poll::Ready(Ok(())) => return Poll::Ready(Ok(())),
182 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
183 Poll::Pending => return Poll::Pending,
184 }
185 }
186 StateProjection::Done => return Poll::Ready(Ok(())),
187 }
188 }
189 }
190}
191
192impl<S: Stream + fmt::Debug> fmt::Debug for ReaderStream<S>
193where
194 S::Item: fmt::Debug,
195{
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 f.debug_struct("ReaderStream")
198 .field("stream", &self.stream)
199 .field("state", &self.state)
200 .finish()
201 }
202}
203
204crate::export! {
205 /// Type and stream expression macro for [`struct@ReaderStream`].
206 ///
207 /// See [`stream!`](crate::response::stream::stream) for the syntax
208 /// supported by this macro.
209 ///
210 /// See [`struct@ReaderStream`] and the [module level
211 /// docs](crate::response::stream#typed-streams) for usage details.
212 macro_rules! ReaderStream {
213 ($($s:tt)*) => ($crate::_typed_stream!(ReaderStream, $($s)*));
214 }
215}