umbral_core/web/streaming.rs
1//! Streaming HTTP response bodies (feature #70).
2//!
3//! A normal umbral handler returns a fully-buffered `String` / `Html` /
4//! `Json`. That's fine for a page, but a 200 MB CSV export or a file
5//! download shouldn't sit in memory all at once. [`StreamingResponse`]
6//! sends the body chunk-by-chunk from an async [`Stream`], so memory stays
7//! flat regardless of payload size and the client starts receiving bytes
8//! before the last row is generated.
9//!
10//! Pairs with [`AppBuilder::compression`](crate::app::AppBuilder::compression):
11//! a streamed body is gzip/brotli-compressed on the fly by the same layer,
12//! so `?format=csv` over a million rows streams *and* compresses without
13//! buffering.
14
15use axum::body::{Body, Bytes};
16use axum::http::{StatusCode, header};
17use axum::response::{IntoResponse, Response};
18use futures_util::{Stream, StreamExt, TryStream};
19
20/// A response whose body is produced incrementally from an async stream.
21///
22/// Build it from a stream, set the content type and (optionally) a
23/// download disposition, then return it from a handler — it implements
24/// [`IntoResponse`].
25///
26/// ```ignore
27/// use umbral::web::StreamingResponse;
28/// use futures_util::stream;
29///
30/// async fn export() -> StreamingResponse {
31/// // each item is a chunk of the body; generate them lazily
32/// let rows = stream::iter((0..1_000_000).map(|i| format!("row {i}\n")));
33/// StreamingResponse::from_chunks(rows)
34/// .content_type("text/csv; charset=utf-8")
35/// .attachment("export.csv")
36/// }
37/// ```
38pub struct StreamingResponse {
39 body: Body,
40 content_type: String,
41 content_disposition: Option<String>,
42 status: StatusCode,
43}
44
45impl StreamingResponse {
46 /// Build from a **fallible** byte stream: each item is
47 /// `Result<impl Into<Bytes>, impl Into<BoxError>>`. Use this when
48 /// producing a chunk can fail (a DB row read, a file `read` call). A
49 /// stream error aborts the response mid-flight — the client sees a
50 /// truncated body, the honest outcome for an already-started stream
51 /// (the status line and headers have already been sent).
52 pub fn new<S>(stream: S) -> Self
53 where
54 S: TryStream + Send + 'static,
55 S::Ok: Into<Bytes>,
56 S::Error: Into<axum::BoxError>,
57 {
58 Self {
59 body: Body::from_stream(stream),
60 content_type: "application/octet-stream".to_string(),
61 content_disposition: None,
62 status: StatusCode::OK,
63 }
64 }
65
66 /// Build from an **infallible** chunk stream: each item is
67 /// `impl Into<Bytes>` (`String`, `Bytes`, `Vec<u8>`, `&'static str`).
68 /// The common "generate rows, never error" case.
69 pub fn from_chunks<S, T>(stream: S) -> Self
70 where
71 S: Stream<Item = T> + Send + 'static,
72 T: Into<Bytes>,
73 {
74 let try_stream = stream.map(|chunk| Ok::<Bytes, std::convert::Infallible>(chunk.into()));
75 Self::new(try_stream)
76 }
77
78 /// Set the `Content-Type`. Defaults to `application/octet-stream`.
79 pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
80 self.content_type = content_type.into();
81 self
82 }
83
84 /// Mark the body as a file download:
85 /// `Content-Disposition: attachment; filename="<name>"`. The browser
86 /// offers a save dialog instead of rendering the body.
87 pub fn attachment(mut self, filename: impl Into<String>) -> Self {
88 self.content_disposition = Some(format!(
89 "attachment; filename=\"{}\"",
90 sanitize_filename(&filename.into())
91 ));
92 self
93 }
94
95 /// `Content-Disposition: inline; filename="<name>"` — display in the
96 /// browser when it can, with a suggested name for "save as".
97 pub fn inline(mut self, filename: impl Into<String>) -> Self {
98 self.content_disposition = Some(format!(
99 "inline; filename=\"{}\"",
100 sanitize_filename(&filename.into())
101 ));
102 self
103 }
104
105 /// Override the status code (defaults to `200 OK`).
106 pub fn status(mut self, status: StatusCode) -> Self {
107 self.status = status;
108 self
109 }
110}
111
112impl IntoResponse for StreamingResponse {
113 fn into_response(self) -> Response {
114 let mut builder = Response::builder()
115 .status(self.status)
116 .header(header::CONTENT_TYPE, self.content_type);
117 if let Some(cd) = self.content_disposition {
118 builder = builder.header(header::CONTENT_DISPOSITION, cd);
119 }
120 builder
121 .body(self.body)
122 .expect("content-type / content-disposition are always valid header values")
123 }
124}
125
126/// Drop CR / LF / `"` from a filename so it can't inject extra response
127/// headers or break out of the quoted `Content-Disposition` value.
128fn sanitize_filename(name: &str) -> String {
129 name.chars()
130 .filter(|c| !matches!(c, '\r' | '\n' | '"'))
131 .collect()
132}