actix_web_lab/
ndjson.rs

1use std::{convert::Infallible, error::Error as StdError, io::Write as _, sync::LazyLock};
2
3use actix_web::{
4    HttpResponse, Responder,
5    body::{BodyStream, MessageBody},
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use pin_project_lite::pin_project;
12use serde::Serialize;
13
14use crate::util::{InfallibleStream, MutWriter};
15
16static NDJSON_MIME: LazyLock<Mime> = LazyLock::new(|| "application/x-ndjson".parse().unwrap());
17
18pin_project! {
19    /// A buffered [NDJSON] serializing body stream.
20    ///
21    /// This has significant memory efficiency advantages over returning an array of JSON objects
22    /// when the data set is very large because it avoids buffering the entire response.
23    ///
24    /// # Examples
25    /// ```
26    /// # use actix_web::Responder;
27    /// # use actix_web_lab::respond::NdJson;
28    /// # use futures_core::Stream;
29    /// fn streaming_data_source() -> impl Stream<Item = serde_json::Value> {
30    ///     // get item stream from source
31    ///     # futures_util::stream::empty()
32    /// }
33    ///
34    /// async fn handler() -> impl Responder {
35    ///     let data_stream = streaming_data_source();
36    ///
37    ///     NdJson::new_infallible(data_stream)
38    ///         .into_responder()
39    /// }
40    /// ```
41    ///
42    /// [NDJSON]: https://github.com/ndjson/ndjson-spec
43    pub struct NdJson<S> {
44        // The wrapped item stream.
45        #[pin]
46        stream: S,
47    }
48}
49
50impl<S> NdJson<S> {
51    /// Constructs a new `NdJson` from a stream of items.
52    pub fn new(stream: S) -> Self {
53        Self { stream }
54    }
55}
56
57impl<S> NdJson<S> {
58    /// Constructs a new `NdJson` from an infallible stream of items.
59    pub fn new_infallible(stream: S) -> NdJson<InfallibleStream<S>> {
60        NdJson::new(InfallibleStream::new(stream))
61    }
62}
63
64impl<S, T, E> NdJson<S>
65where
66    S: Stream<Item = Result<T, E>>,
67    T: Serialize,
68    E: Into<Box<dyn StdError>> + 'static,
69{
70    /// Creates a chunked body stream that serializes as NDJSON on-the-fly.
71    pub fn into_body_stream(self) -> impl MessageBody {
72        BodyStream::new(self.into_chunk_stream())
73    }
74
75    /// Creates a `Responder` type with a serializing stream and correct Content-Type header.
76    pub fn into_responder(self) -> impl Responder
77    where
78        S: 'static,
79        T: 'static,
80        E: 'static,
81    {
82        HttpResponse::Ok()
83            .content_type(NDJSON_MIME.clone())
84            .message_body(self.into_body_stream())
85            .unwrap()
86    }
87
88    /// Creates a stream of serialized chunks.
89    pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
90        self.stream.map_ok(serialize_json_line)
91    }
92}
93
94impl NdJson<Infallible> {
95    /// Returns the NDJSON MIME type (`application/x-ndjson`).
96    pub fn mime() -> Mime {
97        NDJSON_MIME.clone()
98    }
99}
100
101fn serialize_json_line(item: impl Serialize) -> Bytes {
102    let mut buf = BytesMut::new();
103    let mut wrt = MutWriter(&mut buf);
104
105    // serialize JSON line to buffer
106    serde_json::to_writer(&mut wrt, &item).unwrap();
107
108    // add line break to buffer
109    wrt.write_all(b"\n").unwrap();
110
111    buf.freeze()
112}
113
114#[cfg(test)]
115mod tests {
116    use std::error::Error as StdError;
117
118    use actix_web::body;
119    use futures_util::stream;
120    use serde_json::json;
121
122    use super::*;
123
124    #[actix_web::test]
125    async fn serializes_into_body() {
126        let ndjson_body = NdJson::new_infallible(stream::iter(vec![
127            json!(null),
128            json!(1u32),
129            json!("123"),
130            json!({ "abc": "123" }),
131            json!(["abc", 123u32]),
132        ]))
133        .into_body_stream();
134
135        let body_bytes = body::to_bytes(ndjson_body)
136            .await
137            .map_err(Into::<Box<dyn StdError>>::into)
138            .unwrap();
139
140        const EXP_BYTES: &str = "null\n\
141            1\n\
142            \"123\"\n\
143            {\"abc\":\"123\"}\n\
144            [\"abc\",123]\n";
145
146        assert_eq!(body_bytes, EXP_BYTES);
147    }
148}