Skip to main content

umbral_core/web/
streaming.rs

1//! Streaming HTTP response bodies (feature #70).
2//!
3//! A normal umbral handler returns a fully-buffered `String` / `Html` /
4//! `Json`. That's fine for a page, but a 200 MB CSV export or a file
5//! download shouldn't sit in memory all at once. [`StreamingResponse`]
6//! sends the body chunk-by-chunk from an async [`Stream`], so memory stays
7//! flat regardless of payload size and the client starts receiving bytes
8//! before the last row is generated.
9//!
10//! Pairs with [`AppBuilder::compression`](crate::app::AppBuilder::compression):
11//! a streamed body is gzip/brotli-compressed on the fly by the same layer,
12//! so `?format=csv` over a million rows streams *and* compresses without
13//! buffering.
14
15use axum::body::{Body, Bytes};
16use axum::http::{StatusCode, header};
17use axum::response::{IntoResponse, Response};
18use futures_util::{Stream, StreamExt, TryStream};
19
20/// A response whose body is produced incrementally from an async stream.
21///
22/// Build it from a stream, set the content type and (optionally) a
23/// download disposition, then return it from a handler — it implements
24/// [`IntoResponse`].
25///
26/// ```ignore
27/// use umbral::web::StreamingResponse;
28/// use futures_util::stream;
29///
30/// async fn export() -> StreamingResponse {
31///     // each item is a chunk of the body; generate them lazily
32///     let rows = stream::iter((0..1_000_000).map(|i| format!("row {i}\n")));
33///     StreamingResponse::from_chunks(rows)
34///         .content_type("text/csv; charset=utf-8")
35///         .attachment("export.csv")
36/// }
37/// ```
38pub struct StreamingResponse {
39    body: Body,
40    content_type: String,
41    content_disposition: Option<String>,
42    status: StatusCode,
43}
44
45impl StreamingResponse {
46    /// Build from a **fallible** byte stream: each item is
47    /// `Result<impl Into<Bytes>, impl Into<BoxError>>`. Use this when
48    /// producing a chunk can fail (a DB row read, a file `read` call). A
49    /// stream error aborts the response mid-flight — the client sees a
50    /// truncated body, the honest outcome for an already-started stream
51    /// (the status line and headers have already been sent).
52    pub fn new<S>(stream: S) -> Self
53    where
54        S: TryStream + Send + 'static,
55        S::Ok: Into<Bytes>,
56        S::Error: Into<axum::BoxError>,
57    {
58        Self {
59            body: Body::from_stream(stream),
60            content_type: "application/octet-stream".to_string(),
61            content_disposition: None,
62            status: StatusCode::OK,
63        }
64    }
65
66    /// Build from an **infallible** chunk stream: each item is
67    /// `impl Into<Bytes>` (`String`, `Bytes`, `Vec<u8>`, `&'static str`).
68    /// The common "generate rows, never error" case.
69    pub fn from_chunks<S, T>(stream: S) -> Self
70    where
71        S: Stream<Item = T> + Send + 'static,
72        T: Into<Bytes>,
73    {
74        let try_stream = stream.map(|chunk| Ok::<Bytes, std::convert::Infallible>(chunk.into()));
75        Self::new(try_stream)
76    }
77
78    /// Set the `Content-Type`. Defaults to `application/octet-stream`.
79    pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
80        self.content_type = content_type.into();
81        self
82    }
83
84    /// Mark the body as a file download:
85    /// `Content-Disposition: attachment; filename="<name>"`. The browser
86    /// offers a save dialog instead of rendering the body.
87    pub fn attachment(mut self, filename: impl Into<String>) -> Self {
88        self.content_disposition = Some(format!(
89            "attachment; filename=\"{}\"",
90            sanitize_filename(&filename.into())
91        ));
92        self
93    }
94
95    /// `Content-Disposition: inline; filename="<name>"` — display in the
96    /// browser when it can, with a suggested name for "save as".
97    pub fn inline(mut self, filename: impl Into<String>) -> Self {
98        self.content_disposition = Some(format!(
99            "inline; filename=\"{}\"",
100            sanitize_filename(&filename.into())
101        ));
102        self
103    }
104
105    /// Override the status code (defaults to `200 OK`).
106    pub fn status(mut self, status: StatusCode) -> Self {
107        self.status = status;
108        self
109    }
110}
111
112impl IntoResponse for StreamingResponse {
113    fn into_response(self) -> Response {
114        let mut builder = Response::builder()
115            .status(self.status)
116            .header(header::CONTENT_TYPE, self.content_type);
117        if let Some(cd) = self.content_disposition {
118            builder = builder.header(header::CONTENT_DISPOSITION, cd);
119        }
120        builder
121            .body(self.body)
122            .expect("content-type / content-disposition are always valid header values")
123    }
124}
125
126/// Drop CR / LF / `"` from a filename so it can't inject extra response
127/// headers or break out of the quoted `Content-Disposition` value.
128fn sanitize_filename(name: &str) -> String {
129    name.chars()
130        .filter(|c| !matches!(c, '\r' | '\n' | '"'))
131        .collect()
132}