1use std::{convert::Infallible, error::Error as StdError, io::Write as _, sync::LazyLock};
2
3use actix_web::{
4 HttpResponse, Responder,
5 body::{BodyStream, MessageBody},
6};
7use bytes::{Bytes, BytesMut};
8use futures_core::Stream;
9use futures_util::TryStreamExt as _;
10use mime::Mime;
11use pin_project_lite::pin_project;
12use serde::Serialize;
13
14use crate::util::{InfallibleStream, MutWriter};
15
16static NDJSON_MIME: LazyLock<Mime> = LazyLock::new(|| "application/x-ndjson".parse().unwrap());
17
18pin_project! {
19 pub struct NdJson<S> {
44 #[pin]
46 stream: S,
47 }
48}
49
50impl<S> NdJson<S> {
51 pub fn new(stream: S) -> Self {
53 Self { stream }
54 }
55}
56
57impl<S> NdJson<S> {
58 pub fn new_infallible(stream: S) -> NdJson<InfallibleStream<S>> {
60 NdJson::new(InfallibleStream::new(stream))
61 }
62}
63
64impl<S, T, E> NdJson<S>
65where
66 S: Stream<Item = Result<T, E>>,
67 T: Serialize,
68 E: Into<Box<dyn StdError>> + 'static,
69{
70 pub fn into_body_stream(self) -> impl MessageBody {
72 BodyStream::new(self.into_chunk_stream())
73 }
74
75 pub fn into_responder(self) -> impl Responder
77 where
78 S: 'static,
79 T: 'static,
80 E: 'static,
81 {
82 HttpResponse::Ok()
83 .content_type(NDJSON_MIME.clone())
84 .message_body(self.into_body_stream())
85 .unwrap()
86 }
87
88 pub fn into_chunk_stream(self) -> impl Stream<Item = Result<Bytes, E>> {
90 self.stream.map_ok(serialize_json_line)
91 }
92}
93
94impl NdJson<Infallible> {
95 pub fn mime() -> Mime {
97 NDJSON_MIME.clone()
98 }
99}
100
101fn serialize_json_line(item: impl Serialize) -> Bytes {
102 let mut buf = BytesMut::new();
103 let mut wrt = MutWriter(&mut buf);
104
105 serde_json::to_writer(&mut wrt, &item).unwrap();
107
108 wrt.write_all(b"\n").unwrap();
110
111 buf.freeze()
112}
113
114#[cfg(test)]
115mod tests {
116 use std::error::Error as StdError;
117
118 use actix_web::body;
119 use futures_util::stream;
120 use serde_json::json;
121
122 use super::*;
123
124 #[actix_web::test]
125 async fn serializes_into_body() {
126 let ndjson_body = NdJson::new_infallible(stream::iter(vec![
127 json!(null),
128 json!(1u32),
129 json!("123"),
130 json!({ "abc": "123" }),
131 json!(["abc", 123u32]),
132 ]))
133 .into_body_stream();
134
135 let body_bytes = body::to_bytes(ndjson_body)
136 .await
137 .map_err(Into::<Box<dyn StdError>>::into)
138 .unwrap();
139
140 const EXP_BYTES: &str = "null\n\
141 1\n\
142 \"123\"\n\
143 {\"abc\":\"123\"}\n\
144 [\"abc\",123]\n";
145
146 assert_eq!(body_bytes, EXP_BYTES);
147 }
148}