1use bytes::Bytes;
22use futures_util::Stream;
23use http::{header, StatusCode};
24use http_body_util::Full;
25
26use crate::response::{IntoResponse, Response};
27
28pub struct StreamBody<S> {
50 #[allow(dead_code)]
51 stream: S,
52 content_type: Option<String>,
53}
54
55impl<S> StreamBody<S> {
56 pub fn new(stream: S) -> Self {
58 Self {
59 stream,
60 content_type: None,
61 }
62 }
63
64 pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
66 self.content_type = Some(content_type.into());
67 self
68 }
69}
70
71impl<S, E> IntoResponse for StreamBody<S>
75where
76 S: Stream<Item = Result<Bytes, E>> + Send + 'static,
77 E: std::error::Error + Send + Sync + 'static,
78{
79 fn into_response(self) -> Response {
80 let content_type = self
84 .content_type
85 .unwrap_or_else(|| "application/octet-stream".to_string());
86
87 http::Response::builder()
88 .status(StatusCode::OK)
89 .header(header::CONTENT_TYPE, content_type)
90 .header(header::TRANSFER_ENCODING, "chunked")
91 .body(Full::new(Bytes::new()))
92 .unwrap()
93 }
94}
95
96pub fn stream_from_iter<I, E>(
100 chunks: I,
101) -> StreamBody<futures_util::stream::Iter<std::vec::IntoIter<Result<Bytes, E>>>>
102where
103 I: IntoIterator<Item = Result<Bytes, E>>,
104{
105 use futures_util::stream;
106 let vec: Vec<_> = chunks.into_iter().collect();
107 StreamBody::new(stream::iter(vec))
108}
109
110pub fn stream_from_strings<I, S, E>(
114 strings: I,
115) -> StreamBody<futures_util::stream::Iter<std::vec::IntoIter<Result<Bytes, E>>>>
116where
117 I: IntoIterator<Item = Result<S, E>>,
118 S: Into<String>,
119{
120 use futures_util::stream;
121 let vec: Vec<_> = strings
122 .into_iter()
123 .map(|r| r.map(|s| Bytes::from(s.into())))
124 .collect();
125 StreamBody::new(stream::iter(vec))
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use futures_util::stream;
132
133 #[test]
134 fn test_stream_body_default_content_type() {
135 let chunks: Vec<Result<Bytes, std::convert::Infallible>> = vec![Ok(Bytes::from("chunk 1"))];
136 let stream_body = StreamBody::new(stream::iter(chunks));
137 let response = stream_body.into_response();
138
139 assert_eq!(response.status(), StatusCode::OK);
140 assert_eq!(
141 response.headers().get(header::CONTENT_TYPE).unwrap(),
142 "application/octet-stream"
143 );
144 assert_eq!(
145 response.headers().get(header::TRANSFER_ENCODING).unwrap(),
146 "chunked"
147 );
148 }
149
150 #[test]
151 fn test_stream_body_custom_content_type() {
152 let chunks: Vec<Result<Bytes, std::convert::Infallible>> = vec![Ok(Bytes::from("chunk 1"))];
153 let stream_body = StreamBody::new(stream::iter(chunks)).content_type("text/plain");
154 let response = stream_body.into_response();
155
156 assert_eq!(response.status(), StatusCode::OK);
157 assert_eq!(
158 response.headers().get(header::CONTENT_TYPE).unwrap(),
159 "text/plain"
160 );
161 }
162
163 #[test]
164 fn test_stream_from_iter() {
165 let chunks: Vec<Result<Bytes, std::convert::Infallible>> =
166 vec![Ok(Bytes::from("chunk 1")), Ok(Bytes::from("chunk 2"))];
167 let stream_body = stream_from_iter(chunks);
168 let response = stream_body.into_response();
169
170 assert_eq!(response.status(), StatusCode::OK);
171 }
172
173 #[test]
174 fn test_stream_from_strings() {
175 let strings: Vec<Result<&str, std::convert::Infallible>> = vec![Ok("hello"), Ok("world")];
176 let stream_body = stream_from_strings(strings);
177 let response = stream_body.into_response();
178
179 assert_eq!(response.status(), StatusCode::OK);
180 }
181}