Skip to main content

fastapi_core/
ndjson.rs

1//! Newline-Delimited JSON (NDJSON) streaming support.
2//!
3//! This module provides types for streaming large datasets as NDJSON, where each
4//! JSON object is on its own line. This format is ideal for:
5//!
6//! - Streaming database query results
7//! - Real-time log output
8//! - Incremental data export
9//! - Large dataset downloads
10//!
11//! # Overview
12//!
13//! NDJSON (also known as JSON Lines) is a convenient format for streaming JSON data.
14//! Each line is a valid JSON value, typically an object, followed by a newline character.
15//!
16//! # Example
17//!
18//! ```ignore
19//! use fastapi_core::ndjson::{NdjsonResponse, ndjson_response};
20//! use fastapi_core::Response;
21//! use asupersync::stream;
22//! use serde::Serialize;
23//!
24//! #[derive(Serialize)]
25//! struct LogEntry {
26//!     timestamp: u64,
27//!     level: String,
28//!     message: String,
29//! }
30//!
31//! async fn stream_logs() -> Response {
32//!     let logs = stream::iter(vec![
33//!         LogEntry { timestamp: 1, level: "INFO".into(), message: "Started".into() },
34//!         LogEntry { timestamp: 2, level: "DEBUG".into(), message: "Processing".into() },
35//!     ]);
36//!
37//!     NdjsonResponse::new(logs).into_response()
38//! }
39//! ```
40//!
41//! # Wire Format
42//!
43//! ```text
44//! {"id":1,"name":"Alice"}
45//! {"id":2,"name":"Bob"}
46//! {"id":3,"name":"Charlie"}
47//! ```
48//!
49//! Each line is a complete, valid JSON value followed by `\n`.
50//!
51//! # Content Type
52//!
53//! The standard content type for NDJSON is `application/x-ndjson`, though
54//! `application/jsonlines` and `application/json-lines` are also recognized.
55//!
56//! # Memory Efficiency
57//!
58//! NDJSON streaming does not buffer the entire response. Each item is serialized
59//! and sent immediately, making it suitable for datasets of any size.
60//!
61//! # Error Handling
62//!
63//! If serialization fails for an item, the stream will include an error object
64//! on that line and continue with subsequent items. Clients should be prepared
65//! to handle `{"error": "..."}` entries in the stream.
66
67use std::marker::PhantomData;
68use std::pin::Pin;
69use std::task::{Context, Poll};
70
71use asupersync::stream::Stream;
72use serde::Serialize;
73
74use crate::response::{Response, ResponseBody, StatusCode};
75
76/// The standard NDJSON content type.
77pub const NDJSON_CONTENT_TYPE: &[u8] = b"application/x-ndjson";
78
79/// Alternative content types that are sometimes used for NDJSON.
80pub const NDJSON_CONTENT_TYPE_ALT: &[u8] = b"application/jsonlines";
81
82/// Configuration for NDJSON responses.
83#[derive(Debug, Clone)]
84pub struct NdjsonConfig {
85    /// Whether to include a trailing newline after the last item.
86    pub trailing_newline: bool,
87    /// Whether to pretty-print each JSON line (not recommended for production).
88    pub pretty: bool,
89    /// Custom content type (defaults to `application/x-ndjson`).
90    pub content_type: Option<Vec<u8>>,
91}
92
93impl Default for NdjsonConfig {
94    fn default() -> Self {
95        Self {
96            trailing_newline: true,
97            pretty: false,
98            content_type: None,
99        }
100    }
101}
102
103impl NdjsonConfig {
104    /// Create a new NDJSON configuration with defaults.
105    #[must_use]
106    pub fn new() -> Self {
107        Self::default()
108    }
109
110    /// Set whether to include a trailing newline.
111    #[must_use]
112    pub fn trailing_newline(mut self, enabled: bool) -> Self {
113        self.trailing_newline = enabled;
114        self
115    }
116
117    /// Enable pretty-printing of JSON (not recommended for production).
118    #[must_use]
119    pub fn pretty(mut self, enabled: bool) -> Self {
120        self.pretty = enabled;
121        self
122    }
123
124    /// Set a custom content type.
125    #[must_use]
126    pub fn content_type(mut self, content_type: impl Into<Vec<u8>>) -> Self {
127        self.content_type = Some(content_type.into());
128        self
129    }
130
131    /// Get the content type to use.
132    #[must_use]
133    pub fn get_content_type(&self) -> &[u8] {
134        self.content_type.as_deref().unwrap_or(NDJSON_CONTENT_TYPE)
135    }
136}
137
138/// A wrapper that converts an async stream of serializable items into NDJSON format.
139///
140/// Each item from the inner stream is serialized to JSON, followed by a newline.
141/// The resulting bytes are suitable for sending over HTTP as a streaming response.
142///
143/// # Type Parameters
144///
145/// - `S`: The underlying stream type
146/// - `T`: The item type that implements `Serialize`
147///
148/// # Example
149///
150/// ```ignore
151/// use fastapi_core::ndjson::NdjsonStream;
152/// use asupersync::stream;
153/// use serde::Serialize;
154///
155/// #[derive(Serialize)]
156/// struct Item { id: i64, name: String }
157///
158/// let items = stream::iter(vec![
159///     Item { id: 1, name: "Alice".into() },
160///     Item { id: 2, name: "Bob".into() },
161/// ]);
162///
163/// let ndjson_stream = NdjsonStream::new(items);
164/// // Yields: b'{"id":1,"name":"Alice"}\n' then b'{"id":2,"name":"Bob"}\n'
165/// ```
166pub struct NdjsonStream<S, T> {
167    inner: S,
168    config: NdjsonConfig,
169    _marker: PhantomData<T>,
170}
171
172impl<S, T> NdjsonStream<S, T> {
173    /// Create a new NDJSON stream wrapper with default configuration.
174    pub fn new(stream: S) -> Self {
175        Self {
176            inner: stream,
177            config: NdjsonConfig::default(),
178            _marker: PhantomData,
179        }
180    }
181
182    /// Create a new NDJSON stream wrapper with custom configuration.
183    pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
184        Self {
185            inner: stream,
186            config,
187            _marker: PhantomData,
188        }
189    }
190}
191
192impl<S, T> Stream for NdjsonStream<S, T>
193where
194    S: Stream<Item = T> + Unpin,
195    T: Serialize + Unpin,
196{
197    type Item = Vec<u8>;
198
199    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200        let this = self.get_mut();
201        match Pin::new(&mut this.inner).poll_next(cx) {
202            Poll::Ready(Some(item)) => {
203                let mut bytes = if this.config.pretty {
204                    match serde_json::to_vec_pretty(&item) {
205                        Ok(b) => b,
206                        Err(e) => {
207                            // Serialize the error instead
208                            let error = serde_json::json!({
209                                "error": format!("serialization failed: {}", e)
210                            });
211                            serde_json::to_vec(&error)
212                                .unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
213                        }
214                    }
215                } else {
216                    match serde_json::to_vec(&item) {
217                        Ok(b) => b,
218                        Err(e) => {
219                            // Serialize the error instead
220                            let error = serde_json::json!({
221                                "error": format!("serialization failed: {}", e)
222                            });
223                            serde_json::to_vec(&error)
224                                .unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
225                        }
226                    }
227                };
228
229                // Add newline
230                bytes.push(b'\n');
231
232                Poll::Ready(Some(bytes))
233            }
234            Poll::Ready(None) => Poll::Ready(None),
235            Poll::Pending => Poll::Pending,
236        }
237    }
238}
239
240/// Builder for creating NDJSON streaming responses.
241///
242/// This wraps a stream of serializable items and converts it to an HTTP response
243/// with the appropriate `Content-Type: application/x-ndjson` header.
244///
245/// # Example
246///
247/// ```ignore
248/// use fastapi_core::ndjson::NdjsonResponse;
249/// use fastapi_core::Response;
250/// use asupersync::stream;
251/// use serde::Serialize;
252///
253/// #[derive(Serialize)]
254/// struct Record { id: i64, value: f64 }
255///
256/// async fn export_data() -> Response {
257///     let records = stream::iter(vec![
258///         Record { id: 1, value: 1.5 },
259///         Record { id: 2, value: 2.7 },
260///         Record { id: 3, value: 3.14 },
261///     ]);
262///
263///     NdjsonResponse::new(records).into_response()
264/// }
265/// ```
266///
267/// # Headers Set
268///
269/// - `Content-Type: application/x-ndjson`
270/// - `Cache-Control: no-cache` (streaming data shouldn't be cached)
271/// - `Transfer-Encoding: chunked` (implicit for streaming)
272pub struct NdjsonResponse<S, T> {
273    stream: S,
274    config: NdjsonConfig,
275    _marker: PhantomData<T>,
276}
277
278impl<S, T> NdjsonResponse<S, T>
279where
280    S: Stream<Item = T> + Send + Unpin + 'static,
281    T: Serialize + Send + Unpin + 'static,
282{
283    /// Create a new NDJSON response from a stream of serializable items.
284    pub fn new(stream: S) -> Self {
285        Self {
286            stream,
287            config: NdjsonConfig::default(),
288            _marker: PhantomData,
289        }
290    }
291
292    /// Create an NDJSON response with custom configuration.
293    pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
294        Self {
295            stream,
296            config,
297            _marker: PhantomData,
298        }
299    }
300
301    /// Convert to an HTTP Response.
302    ///
303    /// Sets the appropriate headers for NDJSON streaming:
304    /// - `Content-Type: application/x-ndjson`
305    /// - `Cache-Control: no-cache`
306    /// - `X-Accel-Buffering: no` (disables nginx buffering)
307    #[must_use]
308    pub fn into_response(self) -> Response {
309        let ndjson_stream = NdjsonStream::with_config(self.stream, self.config.clone());
310
311        Response::with_status(StatusCode::OK)
312            .header("Content-Type", self.config.get_content_type().to_vec())
313            .header("Cache-Control", b"no-cache".to_vec())
314            .header("X-Accel-Buffering", b"no".to_vec()) // Disable nginx buffering
315            .body(ResponseBody::stream(ndjson_stream))
316    }
317}
318
319/// Convenience function to create an NDJSON response from a stream.
320///
321/// # Example
322///
323/// ```ignore
324/// use fastapi_core::ndjson::ndjson_response;
325/// use serde::Serialize;
326///
327/// #[derive(Serialize)]
328/// struct Item { id: i64 }
329///
330/// let items = asupersync::stream::iter(vec![Item { id: 1 }, Item { id: 2 }]);
331/// let response = ndjson_response(items);
332/// ```
333pub fn ndjson_response<S, T>(stream: S) -> Response
334where
335    S: Stream<Item = T> + Send + Unpin + 'static,
336    T: Serialize + Send + Unpin + 'static,
337{
338    NdjsonResponse::new(stream).into_response()
339}
340
341/// Create an NDJSON response from an iterator.
342///
343/// This is a convenience function for when you have an iterator rather than a stream.
344///
345/// # Example
346///
347/// ```ignore
348/// use fastapi_core::ndjson::ndjson_iter;
349/// use serde::Serialize;
350///
351/// #[derive(Serialize)]
352/// struct User { id: i64, name: String }
353///
354/// let users = vec![
355///     User { id: 1, name: "Alice".into() },
356///     User { id: 2, name: "Bob".into() },
357/// ];
358///
359/// let response = ndjson_iter(users);
360/// ```
361pub fn ndjson_iter<I, T>(iter: I) -> Response
362where
363    I: IntoIterator<Item = T>,
364    I::IntoIter: Send + 'static,
365    T: Serialize + Send + Unpin + 'static,
366{
367    ndjson_response(asupersync::stream::iter(iter))
368}
369
370// ============================================================================
371// Tests
372// ============================================================================
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377    use std::sync::Arc;
378    use std::task::{Wake, Waker};
379
380    struct NoopWaker;
381
382    impl Wake for NoopWaker {
383        fn wake(self: Arc<Self>) {}
384    }
385
386    fn noop_waker() -> Waker {
387        Waker::from(Arc::new(NoopWaker))
388    }
389
390    #[derive(Serialize, Clone)]
391    struct TestItem {
392        id: i64,
393        name: String,
394    }
395
396    #[test]
397    fn ndjson_stream_serializes_items() {
398        let items = vec![
399            TestItem {
400                id: 1,
401                name: "Alice".to_string(),
402            },
403            TestItem {
404                id: 2,
405                name: "Bob".to_string(),
406            },
407        ];
408
409        let stream = asupersync::stream::iter(items);
410        let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
411
412        let waker = noop_waker();
413        let mut cx = Context::from_waker(&waker);
414
415        // First item
416        let result = Pin::new(&mut ndjson).poll_next(&mut cx);
417        if let Poll::Ready(Some(bytes)) = result {
418            let line = String::from_utf8_lossy(&bytes);
419            assert!(line.contains(r#""id":1"#));
420            assert!(line.contains(r#""name":"Alice""#));
421            assert!(line.ends_with('\n'));
422        } else {
423            panic!("Expected Ready(Some(...))");
424        }
425
426        // Second item
427        let result = Pin::new(&mut ndjson).poll_next(&mut cx);
428        if let Poll::Ready(Some(bytes)) = result {
429            let line = String::from_utf8_lossy(&bytes);
430            assert!(line.contains(r#""id":2"#));
431            assert!(line.contains(r#""name":"Bob""#));
432            assert!(line.ends_with('\n'));
433        } else {
434            panic!("Expected Ready(Some(...))");
435        }
436
437        // End of stream
438        let result = Pin::new(&mut ndjson).poll_next(&mut cx);
439        assert!(matches!(result, Poll::Ready(None)));
440    }
441
442    #[test]
443    fn ndjson_stream_each_line_is_valid_json() {
444        let items = vec![
445            TestItem {
446                id: 1,
447                name: "Test".to_string(),
448            },
449            TestItem {
450                id: 2,
451                name: "Item".to_string(),
452            },
453        ];
454
455        let stream = asupersync::stream::iter(items);
456        let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
457
458        let waker = noop_waker();
459        let mut cx = Context::from_waker(&waker);
460
461        // Check each line is valid JSON
462        loop {
463            match Pin::new(&mut ndjson).poll_next(&mut cx) {
464                Poll::Ready(Some(bytes)) => {
465                    // Remove trailing newline and parse
466                    let json_str = String::from_utf8_lossy(&bytes);
467                    let json_str = json_str.trim_end();
468                    let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
469                    assert!(parsed.is_ok(), "Line should be valid JSON: {}", json_str);
470                }
471                Poll::Ready(None) => break,
472                Poll::Pending => panic!("Unexpected Pending"),
473            }
474        }
475    }
476
477    #[test]
478    fn ndjson_stream_empty() {
479        let items: Vec<TestItem> = vec![];
480        let stream = asupersync::stream::iter(items);
481        let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
482
483        let waker = noop_waker();
484        let mut cx = Context::from_waker(&waker);
485
486        let result = Pin::new(&mut ndjson).poll_next(&mut cx);
487        assert!(matches!(result, Poll::Ready(None)));
488    }
489
490    #[test]
491    fn ndjson_config_defaults() {
492        let config = NdjsonConfig::default();
493        assert!(config.trailing_newline);
494        assert!(!config.pretty);
495        assert!(config.content_type.is_none());
496    }
497
498    #[test]
499    fn ndjson_config_custom() {
500        let config = NdjsonConfig::new()
501            .trailing_newline(false)
502            .pretty(true)
503            .content_type(b"application/jsonlines".to_vec());
504
505        assert!(!config.trailing_newline);
506        assert!(config.pretty);
507        assert_eq!(
508            config.get_content_type(),
509            b"application/jsonlines".as_slice()
510        );
511    }
512
513    #[test]
514    fn ndjson_response_sets_content_type() {
515        let items = vec![TestItem {
516            id: 1,
517            name: "Test".to_string(),
518        }];
519
520        let stream = asupersync::stream::iter(items);
521        let response = NdjsonResponse::new(stream).into_response();
522
523        let content_type = response
524            .headers()
525            .iter()
526            .find(|(name, _)| name == "Content-Type")
527            .map(|(_, value)| value.clone());
528
529        assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
530    }
531
532    #[test]
533    fn ndjson_response_sets_cache_control() {
534        let items = vec![TestItem {
535            id: 1,
536            name: "Test".to_string(),
537        }];
538
539        let stream = asupersync::stream::iter(items);
540        let response = NdjsonResponse::new(stream).into_response();
541
542        let cache_control = response
543            .headers()
544            .iter()
545            .find(|(name, _)| name == "Cache-Control")
546            .map(|(_, value)| value.clone());
547
548        assert_eq!(cache_control, Some(b"no-cache".to_vec()));
549    }
550
551    #[test]
552    fn ndjson_response_disables_nginx_buffering() {
553        let items = vec![TestItem {
554            id: 1,
555            name: "Test".to_string(),
556        }];
557
558        let stream = asupersync::stream::iter(items);
559        let response = NdjsonResponse::new(stream).into_response();
560
561        let accel_buffering = response
562            .headers()
563            .iter()
564            .find(|(name, _)| name == "X-Accel-Buffering")
565            .map(|(_, value)| value.clone());
566
567        assert_eq!(accel_buffering, Some(b"no".to_vec()));
568    }
569
570    #[test]
571    fn ndjson_response_status_200() {
572        let items: Vec<TestItem> = vec![];
573        let stream = asupersync::stream::iter(items);
574        let response = NdjsonResponse::new(stream).into_response();
575
576        assert_eq!(response.status().as_u16(), 200);
577    }
578
579    #[test]
580    fn ndjson_response_with_custom_content_type() {
581        let items = vec![TestItem {
582            id: 1,
583            name: "Test".to_string(),
584        }];
585
586        let config = NdjsonConfig::new().content_type(b"application/jsonlines".to_vec());
587        let stream = asupersync::stream::iter(items);
588        let response = NdjsonResponse::with_config(stream, config).into_response();
589
590        let content_type = response
591            .headers()
592            .iter()
593            .find(|(name, _)| name == "Content-Type")
594            .map(|(_, value)| value.clone());
595
596        assert_eq!(content_type, Some(b"application/jsonlines".to_vec()));
597    }
598
599    #[test]
600    fn ndjson_helper_function() {
601        let items = vec![TestItem {
602            id: 1,
603            name: "Test".to_string(),
604        }];
605
606        let stream = asupersync::stream::iter(items);
607        let response = ndjson_response(stream);
608
609        assert_eq!(response.status().as_u16(), 200);
610
611        let content_type = response
612            .headers()
613            .iter()
614            .find(|(name, _)| name == "Content-Type")
615            .map(|(_, value)| value.clone());
616
617        assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
618    }
619
620    #[test]
621    fn ndjson_iter_helper() {
622        let items = vec![
623            TestItem {
624                id: 1,
625                name: "Alice".to_string(),
626            },
627            TestItem {
628                id: 2,
629                name: "Bob".to_string(),
630            },
631        ];
632
633        let response = ndjson_iter(items);
634
635        assert_eq!(response.status().as_u16(), 200);
636    }
637
638    #[test]
639    fn ndjson_handles_special_characters() {
640        #[derive(Serialize)]
641        struct SpecialItem {
642            text: String,
643        }
644
645        let items = vec![
646            SpecialItem {
647                text: "Hello\nWorld".to_string(), // Embedded newline
648            },
649            SpecialItem {
650                text: "Tab\there".to_string(), // Tab
651            },
652            SpecialItem {
653                text: r#"Quote: "test""#.to_string(), // Quotes
654            },
655            SpecialItem {
656                text: "Unicode: 你好".to_string(), // Unicode
657            },
658        ];
659
660        let stream = asupersync::stream::iter(items);
661        let mut ndjson = NdjsonStream::<_, SpecialItem>::new(stream);
662
663        let waker = noop_waker();
664        let mut cx = Context::from_waker(&waker);
665
666        // Each line should be valid JSON
667        loop {
668            match Pin::new(&mut ndjson).poll_next(&mut cx) {
669                Poll::Ready(Some(bytes)) => {
670                    let json_str = String::from_utf8_lossy(&bytes);
671                    let json_str = json_str.trim_end();
672                    let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
673                    assert!(
674                        parsed.is_ok(),
675                        "Line should be valid JSON even with special chars: {}",
676                        json_str
677                    );
678                }
679                Poll::Ready(None) => break,
680                Poll::Pending => panic!("Unexpected Pending"),
681            }
682        }
683    }
684
685    #[test]
686    fn ndjson_pretty_print() {
687        let items = vec![TestItem {
688            id: 1,
689            name: "Test".to_string(),
690        }];
691
692        let config = NdjsonConfig::new().pretty(true);
693        let stream = asupersync::stream::iter(items);
694        let mut ndjson = NdjsonStream::with_config(stream, config);
695
696        let waker = noop_waker();
697        let mut cx = Context::from_waker(&waker);
698
699        let result = Pin::new(&mut ndjson).poll_next(&mut cx);
700        if let Poll::Ready(Some(bytes)) = result {
701            let line = String::from_utf8_lossy(&bytes);
702            // Pretty-printed JSON has internal newlines (not just the trailing one)
703            assert!(line.contains('\n'));
704            // But it should still end with a newline
705            assert!(line.ends_with('\n'));
706        } else {
707            panic!("Expected Ready(Some(...))");
708        }
709    }
710
711    #[test]
712    fn ndjson_content_type_constant() {
713        assert_eq!(NDJSON_CONTENT_TYPE, b"application/x-ndjson");
714        assert_eq!(NDJSON_CONTENT_TYPE_ALT, b"application/jsonlines");
715    }
716}