rocket_community/response/stream/
mod.rs

1//! Potentially infinite async [`Stream`] response types.
2//!
3//! A [`Stream<Item = T>`] is the async analog of an `Iterator<Item = T>`: it
4//! generates a sequence of values asynchronously, otherwise known as an async
5//! _generator_. Types in this module allow for returning responses that are
6//! streams.
7//!
8//! [`Stream<Item = T>`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
9//! [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
10//!
11//! # Raw Streams
12//!
13//! Rust does not yet natively support syntax for creating arbitrary generators,
14//! and as such, for creating streams. To ameliorate this, Rocket exports
15//! [`stream!`], which retrofit generator syntax, allowing raw `impl Stream`s to
16//! be defined using `yield` and `for await` syntax:
17//!
18//! ```rust
19//! # extern crate rocket_community as rocket;
20//! use rocket::futures::stream::Stream;
21//! use rocket::response::stream::stream;
22//!
23//! fn make_stream() -> impl Stream<Item = u8> {
24//!     stream! {
25//!         for i in 0..3 {
26//!             yield i;
27//!         }
28//!     }
29//! }
30//! ```
31//!
32//! See [`stream!`] for full usage details.
33//!
34//! # Typed Streams
35//!
36//! A raw stream is not a `Responder`, so it cannot be directly returned from a
37//! route handler. Instead, one of three _typed_ streams may be used. Each typed
38//! stream places type bounds on the `Item` of the stream, allowing for
39//! `Responder` implementation on the stream itself.
40//!
41//! Each typed stream exists both as a type and as a macro. They are:
42//!
43//!   * [`struct@ReaderStream`] ([`ReaderStream!`]) - streams of `T: AsyncRead`
44//!   * [`struct@ByteStream`] ([`ByteStream!`]) - streams of `T: AsRef<[u8]>`
45//!   * [`struct@TextStream`] ([`TextStream!`]) - streams of `T: AsRef<str>`
46//!   * [`struct@EventStream`] ([`EventStream!`]) - Server-Sent [`Event`] stream
47//!
48//! Each type implements `Responder`; each macro can be invoked to generate a
49//! typed stream, exactly like [`stream!`] above. Additionally, each macro is
50//! also a _type_ macro, expanding to a wrapped `impl Stream<Item = $T>`, where
51//! `$T` is the input to the macro.
52//!
53//! As a concrete example, the route below produces an infinite series of
54//! `"hello"`s, one per second:
55//!
56//! ```rust
57//! # extern crate rocket_community as rocket;
58//! # use rocket::get;
59//! use rocket::tokio::time::{self, Duration};
60//! use rocket::response::stream::TextStream;
61//!
62//! /// Produce an infinite series of `"hello"`s, one per second.
63//! #[get("/infinite-hellos")]
64//! fn hello() -> TextStream![&'static str] {
65//!     TextStream! {
66//!         let mut interval = time::interval(Duration::from_secs(1));
67//!         loop {
68//!             yield "hello";
69//!             interval.tick().await;
70//!         }
71//!     }
72//! }
73//! ```
74//!
75//! The `TextStream![&'static str]` invocation expands to:
76//!
77//! ```rust
78//! # extern crate rocket_community as rocket;
79//! # use rocket::response::stream::TextStream;
80//! # use rocket::futures::stream::Stream;
81//! # use rocket::response::stream::stream;
82//! # fn f() ->
83//! TextStream<impl Stream<Item = &'static str>>
84//! # { TextStream::from(stream! { yield "hi" }) }
85//! ```
86//!
87//! While the inner `TextStream! { .. }` invocation expands to:
88//!
89//! ```rust
90//! # extern crate rocket_community as rocket;
91//! # use rocket::response::stream::{TextStream, stream};
92//! TextStream::from(stream! { /* .. */ })
93//! # ;
94//! ```
95//!
96//! The expansions are identical for `ReaderStream` and `ByteStream`, with
97//! `TextStream` replaced with `ReaderStream` and `ByteStream`, respectively.
98//!
99//! ## Borrowing
100//!
101//! A stream can _yield_ borrowed values with no extra effort:
102//!
103//! ```rust
104//! # extern crate rocket_community as rocket;
105//! # use rocket::get;
106//! use rocket::State;
107//! use rocket::response::stream::TextStream;
108//!
109//! /// Produce a single string borrowed from the request.
110//! #[get("/infinite-hellos")]
111//! fn hello(string: &State<String>) -> TextStream![&str] {
112//!     TextStream! {
113//!         yield string.as_str();
114//!     }
115//! }
116//! ```
117//!
118//! If the stream _contains_ a borrowed value or uses one internally, Rust
119//! requires this fact be explicit with a lifetime annotation:
120//!
121//! ```rust
122//! # extern crate rocket_community as rocket;
123//! # use rocket::get;
124//! use rocket::State;
125//! use rocket::response::stream::TextStream;
126//!
127//! #[get("/")]
128//! fn borrow1(ctxt: &State<bool>) -> TextStream![&'static str + '_] {
129//!     TextStream! {
130//!         // By using `ctxt` in the stream, the borrow is moved into it. Thus,
131//!         // the stream object contains a borrow, prompting the '_ annotation.
132//!         if *ctxt.inner() {
133//!             yield "hello";
134//!         }
135//!     }
136//! }
137//!
138//! // Just as before but yielding an owned yield value.
139//! #[get("/")]
140//! fn borrow2(ctxt: &State<bool>) -> TextStream![String + '_] {
141//!     TextStream! {
142//!         if *ctxt.inner() {
143//!             yield "hello".to_string();
144//!         }
145//!     }
146//! }
147//!
148//! // As before but _also_ return a borrowed value. Without it, Rust gives:
149//! // - lifetime `'r` is missing in item created through this procedural macro
150//! #[get("/")]
151//! fn borrow3<'r>(ctxt: &'r State<bool>, s: &'r State<String>) -> TextStream![&'r str + 'r] {
152//!     TextStream! {
153//!         if *ctxt.inner() {
154//!             yield s.as_str();
155//!         }
156//!     }
157//! }
158//! ```
159//!
160//! # Graceful Shutdown
161//!
162//! Infinite responders, like the one defined in `hello` above, will prolong
163//! shutdown initiated via [`Shutdown::notify()`](crate::Shutdown::notify()) for
164//! the defined grace period. After the grace period has elapsed, Rocket will
165//! abruptly terminate the responder.
166//!
167//! To avoid abrupt termination, graceful shutdown can be detected via the
168//! [`Shutdown`](crate::Shutdown) future, allowing the infinite responder to
169//! gracefully shut itself down. The following example modifies the previous
170//! `hello` with shutdown detection:
171//!
172//! ```rust
173//! # extern crate rocket_community as rocket;
174//! # use rocket::get;
175//! use rocket::Shutdown;
176//! use rocket::response::stream::TextStream;
177//! use rocket::tokio::select;
178//! use rocket::tokio::time::{self, Duration};
179//!
180//! /// Produce an infinite series of `"hello"`s, 1/second, until shutdown.
181//! #[get("/infinite-hellos")]
182//! fn hello(mut shutdown: Shutdown) -> TextStream![&'static str] {
183//!     TextStream! {
184//!         let mut interval = time::interval(Duration::from_secs(1));
185//!         loop {
186//!             select! {
187//!                 _ = interval.tick() => yield "hello",
188//!                 _ = &mut shutdown => {
189//!                     yield "goodbye";
190//!                     break;
191//!                 }
192//!             };
193//!         }
194//!     }
195//! }
196//! ```
197
198mod bytes;
199mod one;
200mod raw_sse;
201mod reader;
202mod sse;
203mod text;
204
205pub(crate) use self::raw_sse::*;
206
207pub use self::bytes::ByteStream;
208pub use self::one::One;
209pub use self::reader::ReaderStream;
210pub use self::sse::{Event, EventStream};
211pub use self::text::TextStream;
212
213crate::export! {
214    /// Retrofitted support for [`Stream`]s with `yield`, `for await` syntax.
215    ///
216    /// [`Stream`]: https://docs.rs/futures/0.3/futures/stream/trait.Stream.html
217    ///
218    /// This macro takes any series of statements and expands them into an
219    /// expression of type `impl Stream<Item = T>`, a stream that `yield`s
220    /// elements of type `T`. It supports any Rust statement syntax with the
221    /// following extensions:
222    ///
223    ///   * `yield expr`
224    ///
225    ///      Yields the result of evaluating `expr` to the caller (the stream
226    ///      consumer). `expr` must be of type `T`.
227    ///
228    ///   * `for await x in stream { .. }`
229    ///
230    ///      `await`s the next element in `stream`, binds it to `x`, and
231    ///      executes the block with the binding. `stream` must implement
232    ///      `Stream<Item = T>`; the type of `x` is `T`.
233    ///
234    ///   * `?` short-circuits stream termination on `Err`
235    ///
236    /// # Examples
237    ///
238    /// ```rust
239    /// # extern crate rocket_community as rocket;
240    /// use rocket::response::stream::stream;
241    /// use rocket::futures::stream::Stream;
242    ///
243    /// fn f(stream: impl Stream<Item = u8>) -> impl Stream<Item = String> {
244    ///     stream! {
245    ///         for s in &["hi", "there"]{
246    ///             yield s.to_string();
247    ///         }
248    ///
249    ///         for await n in stream {
250    ///             yield format!("n: {}", n);
251    ///         }
252    ///     }
253    /// }
254    ///
255    /// # rocket::async_test(async {
256    /// use rocket::futures::stream::{self, StreamExt};
257    ///
258    /// let stream = f(stream::iter(vec![3, 7, 11]));
259    /// let strings: Vec<_> = stream.collect().await;
260    /// assert_eq!(strings, ["hi", "there", "n: 3", "n: 7", "n: 11"]);
261    /// # });
262    /// ```
263    ///
264    /// Using `?` on an `Err` short-circuits stream termination:
265    ///
266    /// ```rust
267    /// # extern crate rocket_community as rocket;
268    /// use std::io;
269    ///
270    /// use rocket::response::stream::stream;
271    /// use rocket::futures::stream::Stream;
272    ///
273    /// fn g<S>(stream: S) -> impl Stream<Item = io::Result<u8>>
274    ///     where S: Stream<Item = io::Result<&'static str>>
275    /// {
276    ///     stream! {
277    ///         for await s in stream {
278    ///             let num = s?.parse();
279    ///             let num = num.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
280    ///             yield Ok(num);
281    ///         }
282    ///     }
283    /// }
284    ///
285    /// # rocket::async_test(async {
286    /// use rocket::futures::stream::{self, StreamExt};
287    ///
288    /// let e = io::Error::last_os_error();
289    /// let stream = g(stream::iter(vec![Ok("3"), Ok("four"), Err(e), Ok("2")]));
290    /// let results: Vec<_> = stream.collect().await;
291    /// assert!(matches!(results.as_slice(), &[Ok(3), Err(_)]));
292    /// # });
293    /// ```
294    macro_rules! stream {
295        ($($t:tt)*) => ($crate::async_stream::stream!($($t)*));
296    }
297}
298
299#[doc(hidden)]
300#[macro_export]
301macro_rules! _typed_stream {
302    ($S:ident, $($t:tt)*) => (
303        $crate::__typed_stream! {
304            $crate::response::stream::$S,
305            $crate::response::stream::stream,
306            $crate::futures::stream::Stream,
307            $($t)*
308        }
309    )
310}