rustapi_core/
stream.rs

1//! Streaming response types for RustAPI
2//!
3//! This module provides types for streaming response bodies.
4//!
5//! # Example
6//!
7//! ```rust,ignore
8//! use rustapi_core::stream::StreamBody;
9//! use futures_util::stream;
10//! use bytes::Bytes;
11//!
12//! async fn stream_data() -> StreamBody<impl Stream<Item = Result<Bytes, std::convert::Infallible>>> {
13//!     let stream = stream::iter(vec![
14//!         Ok(Bytes::from("chunk 1")),
15//!         Ok(Bytes::from("chunk 2")),
16//!     ]);
17//!     StreamBody::new(stream)
18//! }
19//! ```
20
21use bytes::Bytes;
22use futures_util::Stream;
23use http::{header, StatusCode};
24use http_body_util::Full;
25
26use crate::response::{IntoResponse, Response};
27
28/// A streaming body wrapper for HTTP responses
29///
30/// `StreamBody` wraps a stream of bytes and converts it to an HTTP response.
31/// This is useful for streaming large amounts of data without buffering
32/// the entire response in memory.
33///
34/// # Example
35///
36/// ```rust,ignore
37/// use rustapi_core::stream::StreamBody;
38/// use futures_util::stream;
39/// use bytes::Bytes;
40///
41/// async fn stream_data() -> StreamBody<impl Stream<Item = Result<Bytes, std::convert::Infallible>>> {
42///     let stream = stream::iter(vec![
43///         Ok(Bytes::from("chunk 1")),
44///         Ok(Bytes::from("chunk 2")),
45///     ]);
46///     StreamBody::new(stream)
47/// }
48/// ```
49pub struct StreamBody<S> {
50    #[allow(dead_code)]
51    stream: S,
52    content_type: Option<String>,
53}
54
55impl<S> StreamBody<S> {
56    /// Create a new streaming body from a stream
57    pub fn new(stream: S) -> Self {
58        Self {
59            stream,
60            content_type: None,
61        }
62    }
63
64    /// Set the content type for the response
65    pub fn content_type(mut self, content_type: impl Into<String>) -> Self {
66        self.content_type = Some(content_type.into());
67        self
68    }
69}
70
71// For now, we'll implement IntoResponse by returning a response with appropriate headers
72// The actual streaming would require changes to the Response type to support streaming bodies
73// This is a simplified implementation that works with the current Response type (Full<Bytes>)
74impl<S, E> IntoResponse for StreamBody<S>
75where
76    S: Stream<Item = Result<Bytes, E>> + Send + 'static,
77    E: std::error::Error + Send + Sync + 'static,
78{
79    fn into_response(self) -> Response {
80        // For the initial implementation, we return a response with streaming headers
81        // and an empty body. The actual streaming would require a different body type.
82
83        let content_type = self
84            .content_type
85            .unwrap_or_else(|| "application/octet-stream".to_string());
86
87        http::Response::builder()
88            .status(StatusCode::OK)
89            .header(header::CONTENT_TYPE, content_type)
90            .header(header::TRANSFER_ENCODING, "chunked")
91            .body(Full::new(Bytes::new()))
92            .unwrap()
93    }
94}
95
96/// Helper function to create a streaming body from an iterator of byte chunks
97///
98/// This is useful for simple cases where you have a fixed set of chunks.
99pub fn stream_from_iter<I, E>(
100    chunks: I,
101) -> StreamBody<futures_util::stream::Iter<std::vec::IntoIter<Result<Bytes, E>>>>
102where
103    I: IntoIterator<Item = Result<Bytes, E>>,
104{
105    use futures_util::stream;
106    let vec: Vec<_> = chunks.into_iter().collect();
107    StreamBody::new(stream::iter(vec))
108}
109
110/// Helper function to create a streaming body from a string iterator
111///
112/// Converts each string to bytes automatically.
113pub fn stream_from_strings<I, S, E>(
114    strings: I,
115) -> StreamBody<futures_util::stream::Iter<std::vec::IntoIter<Result<Bytes, E>>>>
116where
117    I: IntoIterator<Item = Result<S, E>>,
118    S: Into<String>,
119{
120    use futures_util::stream;
121    let vec: Vec<_> = strings
122        .into_iter()
123        .map(|r| r.map(|s| Bytes::from(s.into())))
124        .collect();
125    StreamBody::new(stream::iter(vec))
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use futures_util::stream;
132
133    #[test]
134    fn test_stream_body_default_content_type() {
135        let chunks: Vec<Result<Bytes, std::convert::Infallible>> = vec![Ok(Bytes::from("chunk 1"))];
136        let stream_body = StreamBody::new(stream::iter(chunks));
137        let response = stream_body.into_response();
138
139        assert_eq!(response.status(), StatusCode::OK);
140        assert_eq!(
141            response.headers().get(header::CONTENT_TYPE).unwrap(),
142            "application/octet-stream"
143        );
144        assert_eq!(
145            response.headers().get(header::TRANSFER_ENCODING).unwrap(),
146            "chunked"
147        );
148    }
149
150    #[test]
151    fn test_stream_body_custom_content_type() {
152        let chunks: Vec<Result<Bytes, std::convert::Infallible>> = vec![Ok(Bytes::from("chunk 1"))];
153        let stream_body = StreamBody::new(stream::iter(chunks)).content_type("text/plain");
154        let response = stream_body.into_response();
155
156        assert_eq!(response.status(), StatusCode::OK);
157        assert_eq!(
158            response.headers().get(header::CONTENT_TYPE).unwrap(),
159            "text/plain"
160        );
161    }
162
163    #[test]
164    fn test_stream_from_iter() {
165        let chunks: Vec<Result<Bytes, std::convert::Infallible>> =
166            vec![Ok(Bytes::from("chunk 1")), Ok(Bytes::from("chunk 2"))];
167        let stream_body = stream_from_iter(chunks);
168        let response = stream_body.into_response();
169
170        assert_eq!(response.status(), StatusCode::OK);
171    }
172
173    #[test]
174    fn test_stream_from_strings() {
175        let strings: Vec<Result<&str, std::convert::Infallible>> = vec![Ok("hello"), Ok("world")];
176        let stream_body = stream_from_strings(strings);
177        let response = stream_body.into_response();
178
179        assert_eq!(response.status(), StatusCode::OK);
180    }
181}