Skip to main content

axum_streams/
text_format.rs

1use crate::stream_body_as::StreamBodyAsOptions;
2use crate::stream_format::StreamingFormat;
3use crate::StreamBodyAs;
4use futures::stream::BoxStream;
5use futures::Stream;
6use futures::StreamExt;
7use http::HeaderMap;
8
9pub struct TextStreamFormat;
10
11impl TextStreamFormat {
12    pub fn new() -> Self {
13        Self {}
14    }
15}
16
17impl StreamingFormat<String> for TextStreamFormat {
18    fn to_bytes_stream<'a, 'b>(
19        &'a self,
20        stream: BoxStream<'b, Result<String, axum::Error>>,
21        _: &'a StreamBodyAsOptions,
22    ) -> BoxStream<'b, Result<axum::body::Bytes, axum::Error>> {
23        fn write_text_record(obj: String) -> Result<Vec<u8>, axum::Error> {
24            let obj_vec = obj.as_bytes().to_vec();
25            Ok(obj_vec)
26        }
27
28        Box::pin(stream.map(move |obj_res| match obj_res {
29            Err(e) => Err(e),
30            Ok(obj) => write_text_record(obj).map(|data| data.into()),
31        }))
32    }
33
34    fn http_response_headers(&self, options: &StreamBodyAsOptions) -> Option<HeaderMap> {
35        let mut header_map = HeaderMap::new();
36        header_map.insert(
37            http::header::CONTENT_TYPE,
38            options.content_type.clone().unwrap_or_else(|| {
39                http::header::HeaderValue::from_static("text/plain; charset=utf-8")
40            }),
41        );
42        Some(header_map)
43    }
44}
45
46impl<'a> StreamBodyAs<'a> {
47    pub fn text<S>(stream: S) -> Self
48    where
49        S: Stream<Item = String> + 'a + Send,
50    {
51        Self::new(
52            TextStreamFormat::new(),
53            stream.map(Ok::<String, axum::Error>),
54        )
55    }
56
57    pub fn text_with_errors<S, E>(stream: S) -> Self
58    where
59        S: Stream<Item = Result<String, E>> + 'a + Send,
60        E: Into<axum::Error>,
61    {
62        Self::new(TextStreamFormat::new(), stream)
63    }
64}
65
66impl StreamBodyAsOptions {
67    pub fn text<'a, S>(self, stream: S) -> StreamBodyAs<'a>
68    where
69        S: Stream<Item = String> + 'a + Send,
70    {
71        StreamBodyAs::with_options(
72            TextStreamFormat::new(),
73            stream.map(Ok::<String, axum::Error>),
74            self,
75        )
76    }
77
78    pub fn text_with_errors<'a, S, E>(self, stream: S) -> StreamBodyAs<'a>
79    where
80        S: Stream<Item = Result<String, E>> + 'a + Send,
81        E: Into<axum::Error>,
82    {
83        StreamBodyAs::with_options(TextStreamFormat::new(), stream, self)
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use crate::test_client::*;
91    use crate::StreamBodyAs;
92    use axum::{routing::*, Router};
93    use futures::stream;
94
95    #[tokio::test]
96    async fn serialize_text_stream_format() {
97        let test_stream_vec = vec![
98            String::from("bar1"),
99            String::from("bar2"),
100            String::from("bar3"),
101            String::from("bar4"),
102            String::from("bar5"),
103            String::from("bar6"),
104            String::from("bar7"),
105            String::from("bar8"),
106            String::from("bar9"),
107        ];
108
109        let test_stream = Box::pin(stream::iter(test_stream_vec.clone()));
110
111        let app = Router::new().route(
112            "/",
113            get(|| async {
114                StreamBodyAs::new(
115                    TextStreamFormat::new(),
116                    test_stream.map(Ok::<_, axum::Error>),
117                )
118            }),
119        );
120
121        let client = TestClient::new(app).await;
122
123        let expected_text_buf: Vec<u8> = test_stream_vec
124            .iter()
125            .flat_map(|obj| {
126                let obj_vec = obj.as_bytes().to_vec();
127                obj_vec
128            })
129            .collect();
130
131        let res = client.get("/").send().await.unwrap();
132        let body = res.bytes().await.unwrap().to_vec();
133
134        assert_eq!(body, expected_text_buf);
135    }
136}