rocket_community/response/stream/mod.rs
1//! Potentially infinite async [`Stream`] response types.
2//!
3//! A [`Stream<Item = T>`] is the async analog of an `Iterator<Item = T>`: it
4//! generates a sequence of values asynchronously, otherwise known as an async
5//! _generator_. Types in this module allow for returning responses that are
6//! streams.
7//!
8//! [`Stream<Item = T>`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
9//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
10//!
11//! # Raw Streams
12//!
13//! Rust does not yet natively support syntax for creating arbitrary generators,
14//! and as such, for creating streams. To ameliorate this, Rocket exports
15//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to
16//! be defined using `yield` and `for await` syntax:
17//!
18//! ```rust
19//! # extern crate rocket_community as rocket;
20//! use rocket::futures::stream::Stream;
21//! use rocket::response::stream::stream;
22//!
23//! fn make_stream() -> impl Stream<Item = u8> {
24//! stream! {
25//! for i in 0..3 {
26//! yield i;
27//! }
28//! }
29//! }
30//! ```
31//!
32//! See [`stream!`] for full usage details.
33//!
34//! # Typed Streams
35//!
36//! A raw stream is not a `Responder`, so it cannot be directly returned from a
37//! route handler. Instead, one of three _typed_ streams may be used. Each typed
38//! stream places type bounds on the `Item` of the stream, allowing for
39//! `Responder` implementation on the stream itself.
40//!
41//! Each typed stream exists both as a type and as a macro. They are:
42//!
43//! * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
44//! * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
45//! * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
46//! * [`struct@EventStream`] ([`EventStream!`]) - Server-Sent [`Event`] stream
47//!
48//! Each type implements `Responder`; each macro can be invoked to generate a
49//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
50//! also a _type_ macro, expanding to a wrapped `impl Stream<Item = $T>`, where
51//! `$T` is the input to the macro.
52//!
53//! As a concrete example, the route below produces an infinite series of
54//! `"hello"`s, one per second:
55//!
56//! ```rust
57//! # extern crate rocket_community as rocket;
58//! # use rocket::get;
59//! use rocket::tokio::time::{self, Duration};
60//! use rocket::response::stream::TextStream;
61//!
62//! /// Produce an infinite series of `"hello"`s, one per second.
63//! #[get("/infinite-hellos")]
64//! fn hello() -> TextStream![&'static str] {
65//! TextStream! {
66//! let mut interval = time::interval(Duration::from_secs(1));
67//! loop {
68//! yield "hello";
69//! interval.tick().await;
70//! }
71//! }
72//! }
73//! ```
74//!
75//! The `TextStream![&'static str]` invocation expands to:
76//!
77//! ```rust
78//! # extern crate rocket_community as rocket;
79//! # use rocket::response::stream::TextStream;
80//! # use rocket::futures::stream::Stream;
81//! # use rocket::response::stream::stream;
82//! # fn f() ->
83//! TextStream<impl Stream<Item = &'static str>>
84//! # { TextStream::from(stream! { yield "hi" }) }
85//! ```
86//!
87//! While the inner `TextStream! { .. }` invocation expands to:
88//!
89//! ```rust
90//! # extern crate rocket_community as rocket;
91//! # use rocket::response::stream::{TextStream, stream};
92//! TextStream::from(stream! { /* .. */ })
93//! # ;
94//! ```
95//!
96//! The expansions are identical for `ReaderStream` and `ByteStream`, with
97//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively.
98//!
99//! ## Borrowing
100//!
101//! A stream can _yield_ borrowed values with no extra effort:
102//!
103//! ```rust
104//! # extern crate rocket_community as rocket;
105//! # use rocket::get;
106//! use rocket::State;
107//! use rocket::response::stream::TextStream;
108//!
109//! /// Produce a single string borrowed from the request.
110//! #[get("/infinite-hellos")]
111//! fn hello(string: &State<String>) -> TextStream![&str] {
112//! TextStream! {
113//! yield string.as_str();
114//! }
115//! }
116//! ```
117//!
118//! If the stream _contains_ a borrowed value or uses one internally, Rust
119//! requires this fact be explicit with a lifetime annotation:
120//!
121//! ```rust
122//! # extern crate rocket_community as rocket;
123//! # use rocket::get;
124//! use rocket::State;
125//! use rocket::response::stream::TextStream;
126//!
127//! #[get("/")]
128//! fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
129//! TextStream! {
130//! // By using `ctxt` in the stream, the borrow is moved into it. Thus,
131//! // the stream object contains a borrow, prompting the '_ annotation.
132//! if *ctxt.inner() {
133//! yield "hello";
134//! }
135//! }
136//! }
137//!
138//! // Just as before but yielding an owned yield value.
139//! #[get("/")]
140//! fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
141//! TextStream! {
142//! if *ctxt.inner() {
143//! yield "hello".to_string();
144//! }
145//! }
146//! }
147//!
148//! // As before but _also_ return a borrowed value. Without it, Rust gives:
149//! // - lifetime `'r` is missing in item created through this procedural macro
150//! #[get("/")]
151//! fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
152//! TextStream! {
153//! if *ctxt.inner() {
154//! yield s.as_str();
155//! }
156//! }
157//! }
158//! ```
159//!
160//! # Graceful Shutdown
161//!
162//! Infinite responders, like the one defined in `hello` above, will prolong
163//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for
164//! the defined grace period. After the grace period has elapsed, Rocket will
165//! abruptly terminate the responder.
166//!
167//! To avoid abrupt termination, graceful shutdown can be detected via the
168//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to
169//! gracefully shut itself down. The following example modifies the previous
170//! `hello` with shutdown detection:
171//!
172//! ```rust
173//! # extern crate rocket_community as rocket;
174//! # use rocket::get;
175//! use rocket::Shutdown;
176//! use rocket::response::stream::TextStream;
177//! use rocket::tokio::select;
178//! use rocket::tokio::time::{self, Duration};
179//!
180//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
181//! #[get("/infinite-hellos")]
182//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
183//! TextStream! {
184//! let mut interval = time::interval(Duration::from_secs(1));
185//! loop {
186//! select! {
187//! _ = interval.tick() => yield "hello",
188//! _ = &mut shutdown => {
189//! yield "goodbye";
190//! break;
191//! }
192//! };
193//! }
194//! }
195//! }
196//! ```
197
198mod bytes;
199mod one;
200mod raw_sse;
201mod reader;
202mod sse;
203mod text;
204
205pub(crate) use self::raw_sse::*;
206
207pub use self::bytes::ByteStream;
208pub use self::one::One;
209pub use self::reader::ReaderStream;
210pub use self::sse::{Event, EventStream};
211pub use self::text::TextStream;
212
213crate::export! {
214 /// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
215 ///
216 /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
217 ///
218 /// This macro takes any series of statements and expands them into an
219 /// expression of type `impl Stream<Item = T>`, a stream that `yield`s
220 /// elements of type `T`. It supports any Rust statement syntax with the
221 /// following extensions:
222 ///
223 /// * `yield expr`
224 ///
225 /// Yields the result of evaluating `expr` to the caller (the stream
226 /// consumer). `expr` must be of type `T`.
227 ///
228 /// * `for await x in stream { .. }`
229 ///
230 /// `await`s the next element in `stream`, binds it to `x`, and
231 /// executes the block with the binding. `stream` must implement
232 /// `Stream<Item = T>`; the type of `x` is `T`.
233 ///
234 /// * `?` short-circuits stream termination on `Err`
235 ///
236 /// # Examples
237 ///
238 /// ```rust
239 /// # extern crate rocket_community as rocket;
240 /// use rocket::response::stream::stream;
241 /// use rocket::futures::stream::Stream;
242 ///
243 /// fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
244 /// stream! {
245 /// for s in &["hi", "there"]{
246 /// yield s.to_string();
247 /// }
248 ///
249 /// for await n in stream {
250 /// yield format!("n: {}", n);
251 /// }
252 /// }
253 /// }
254 ///
255 /// # rocket::async_test(async {
256 /// use rocket::futures::stream::{self, StreamExt};
257 ///
258 /// let stream = f(stream::iter(vec![3, 7, 11]));
259 /// let strings: Vec<_> = stream.collect().await;
260 /// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
261 /// # });
262 /// ```
263 ///
264 /// Using `?` on an `Err` short-circuits stream termination:
265 ///
266 /// ```rust
267 /// # extern crate rocket_community as rocket;
268 /// use std::io;
269 ///
270 /// use rocket::response::stream::stream;
271 /// use rocket::futures::stream::Stream;
272 ///
273 /// fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
274 /// where S: Stream<Item = io::Result<&'static str>>
275 /// {
276 /// stream! {
277 /// for await s in stream {
278 /// let num = s?.parse();
279 /// let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
280 /// yield Ok(num);
281 /// }
282 /// }
283 /// }
284 ///
285 /// # rocket::async_test(async {
286 /// use rocket::futures::stream::{self, StreamExt};
287 ///
288 /// let e = io::Error::last_os_error();
289 /// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
290 /// let results: Vec<_> = stream.collect().await;
291 /// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));
292 /// # });
293 /// ```
294 macro_rules! stream {
295 ($($t:tt)*) => ($crate::async_stream::stream!($($t)*));
296 }
297}
298
299#[doc(hidden)]
300#[macro_export]
301macro_rules! _typed_stream {
302 ($S:ident, $($t:tt)*) => (
303 $crate::__typed_stream! {
304 $crate::response::stream::$S,
305 $crate::response::stream::stream,
306 $crate::futures::stream::Stream,
307 $($t)*
308 }
309 )
310}