1use std::marker::PhantomData;
68use std::pin::Pin;
69use std::task::{Context, Poll};
70
71use asupersync::stream::Stream;
72use serde::Serialize;
73
74use crate::response::{Response, ResponseBody, StatusCode};
75
76pub const NDJSON_CONTENT_TYPE: &[u8] = b"application/x-ndjson";
78
79pub const NDJSON_CONTENT_TYPE_ALT: &[u8] = b"application/jsonlines";
81
82#[derive(Debug, Clone)]
84pub struct NdjsonConfig {
85 pub trailing_newline: bool,
87 pub pretty: bool,
89 pub content_type: Option<Vec<u8>>,
91}
92
93impl Default for NdjsonConfig {
94 fn default() -> Self {
95 Self {
96 trailing_newline: true,
97 pretty: false,
98 content_type: None,
99 }
100 }
101}
102
103impl NdjsonConfig {
104 #[must_use]
106 pub fn new() -> Self {
107 Self::default()
108 }
109
110 #[must_use]
112 pub fn trailing_newline(mut self, enabled: bool) -> Self {
113 self.trailing_newline = enabled;
114 self
115 }
116
117 #[must_use]
119 pub fn pretty(mut self, enabled: bool) -> Self {
120 self.pretty = enabled;
121 self
122 }
123
124 #[must_use]
126 pub fn content_type(mut self, content_type: impl Into<Vec<u8>>) -> Self {
127 self.content_type = Some(content_type.into());
128 self
129 }
130
131 #[must_use]
133 pub fn get_content_type(&self) -> &[u8] {
134 self.content_type.as_deref().unwrap_or(NDJSON_CONTENT_TYPE)
135 }
136}
137
138pub struct NdjsonStream<S, T> {
167 inner: S,
168 config: NdjsonConfig,
169 _marker: PhantomData<T>,
170}
171
172impl<S, T> NdjsonStream<S, T> {
173 pub fn new(stream: S) -> Self {
175 Self {
176 inner: stream,
177 config: NdjsonConfig::default(),
178 _marker: PhantomData,
179 }
180 }
181
182 pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
184 Self {
185 inner: stream,
186 config,
187 _marker: PhantomData,
188 }
189 }
190}
191
192impl<S, T> Stream for NdjsonStream<S, T>
193where
194 S: Stream<Item = T> + Unpin,
195 T: Serialize + Unpin,
196{
197 type Item = Vec<u8>;
198
199 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200 let this = self.get_mut();
201 match Pin::new(&mut this.inner).poll_next(cx) {
202 Poll::Ready(Some(item)) => {
203 let mut bytes = if this.config.pretty {
204 match serde_json::to_vec_pretty(&item) {
205 Ok(b) => b,
206 Err(e) => {
207 let error = serde_json::json!({
209 "error": format!("serialization failed: {}", e)
210 });
211 serde_json::to_vec(&error)
212 .unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
213 }
214 }
215 } else {
216 match serde_json::to_vec(&item) {
217 Ok(b) => b,
218 Err(e) => {
219 let error = serde_json::json!({
221 "error": format!("serialization failed: {}", e)
222 });
223 serde_json::to_vec(&error)
224 .unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
225 }
226 }
227 };
228
229 bytes.push(b'\n');
231
232 Poll::Ready(Some(bytes))
233 }
234 Poll::Ready(None) => Poll::Ready(None),
235 Poll::Pending => Poll::Pending,
236 }
237 }
238}
239
240pub struct NdjsonResponse<S, T> {
273 stream: S,
274 config: NdjsonConfig,
275 _marker: PhantomData<T>,
276}
277
278impl<S, T> NdjsonResponse<S, T>
279where
280 S: Stream<Item = T> + Send + Unpin + 'static,
281 T: Serialize + Send + Unpin + 'static,
282{
283 pub fn new(stream: S) -> Self {
285 Self {
286 stream,
287 config: NdjsonConfig::default(),
288 _marker: PhantomData,
289 }
290 }
291
292 pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
294 Self {
295 stream,
296 config,
297 _marker: PhantomData,
298 }
299 }
300
301 #[must_use]
308 pub fn into_response(self) -> Response {
309 let ndjson_stream = NdjsonStream::with_config(self.stream, self.config.clone());
310
311 Response::with_status(StatusCode::OK)
312 .header("Content-Type", self.config.get_content_type().to_vec())
313 .header("Cache-Control", b"no-cache".to_vec())
314 .header("X-Accel-Buffering", b"no".to_vec()) .body(ResponseBody::stream(ndjson_stream))
316 }
317}
318
319pub fn ndjson_response<S, T>(stream: S) -> Response
334where
335 S: Stream<Item = T> + Send + Unpin + 'static,
336 T: Serialize + Send + Unpin + 'static,
337{
338 NdjsonResponse::new(stream).into_response()
339}
340
341pub fn ndjson_iter<I, T>(iter: I) -> Response
362where
363 I: IntoIterator<Item = T>,
364 I::IntoIter: Send + 'static,
365 T: Serialize + Send + Unpin + 'static,
366{
367 ndjson_response(asupersync::stream::iter(iter))
368}
369
370#[cfg(test)]
375mod tests {
376 use super::*;
377 use std::sync::Arc;
378 use std::task::{Wake, Waker};
379
380 struct NoopWaker;
381
382 impl Wake for NoopWaker {
383 fn wake(self: Arc<Self>) {}
384 }
385
386 fn noop_waker() -> Waker {
387 Waker::from(Arc::new(NoopWaker))
388 }
389
390 #[derive(Serialize, Clone)]
391 struct TestItem {
392 id: i64,
393 name: String,
394 }
395
396 #[test]
397 fn ndjson_stream_serializes_items() {
398 let items = vec![
399 TestItem {
400 id: 1,
401 name: "Alice".to_string(),
402 },
403 TestItem {
404 id: 2,
405 name: "Bob".to_string(),
406 },
407 ];
408
409 let stream = asupersync::stream::iter(items);
410 let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
411
412 let waker = noop_waker();
413 let mut cx = Context::from_waker(&waker);
414
415 let result = Pin::new(&mut ndjson).poll_next(&mut cx);
417 if let Poll::Ready(Some(bytes)) = result {
418 let line = String::from_utf8_lossy(&bytes);
419 assert!(line.contains(r#""id":1"#));
420 assert!(line.contains(r#""name":"Alice""#));
421 assert!(line.ends_with('\n'));
422 } else {
423 panic!("Expected Ready(Some(...))");
424 }
425
426 let result = Pin::new(&mut ndjson).poll_next(&mut cx);
428 if let Poll::Ready(Some(bytes)) = result {
429 let line = String::from_utf8_lossy(&bytes);
430 assert!(line.contains(r#""id":2"#));
431 assert!(line.contains(r#""name":"Bob""#));
432 assert!(line.ends_with('\n'));
433 } else {
434 panic!("Expected Ready(Some(...))");
435 }
436
437 let result = Pin::new(&mut ndjson).poll_next(&mut cx);
439 assert!(matches!(result, Poll::Ready(None)));
440 }
441
442 #[test]
443 fn ndjson_stream_each_line_is_valid_json() {
444 let items = vec![
445 TestItem {
446 id: 1,
447 name: "Test".to_string(),
448 },
449 TestItem {
450 id: 2,
451 name: "Item".to_string(),
452 },
453 ];
454
455 let stream = asupersync::stream::iter(items);
456 let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
457
458 let waker = noop_waker();
459 let mut cx = Context::from_waker(&waker);
460
461 loop {
463 match Pin::new(&mut ndjson).poll_next(&mut cx) {
464 Poll::Ready(Some(bytes)) => {
465 let json_str = String::from_utf8_lossy(&bytes);
467 let json_str = json_str.trim_end();
468 let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
469 assert!(parsed.is_ok(), "Line should be valid JSON: {}", json_str);
470 }
471 Poll::Ready(None) => break,
472 Poll::Pending => panic!("Unexpected Pending"),
473 }
474 }
475 }
476
477 #[test]
478 fn ndjson_stream_empty() {
479 let items: Vec<TestItem> = vec![];
480 let stream = asupersync::stream::iter(items);
481 let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
482
483 let waker = noop_waker();
484 let mut cx = Context::from_waker(&waker);
485
486 let result = Pin::new(&mut ndjson).poll_next(&mut cx);
487 assert!(matches!(result, Poll::Ready(None)));
488 }
489
490 #[test]
491 fn ndjson_config_defaults() {
492 let config = NdjsonConfig::default();
493 assert!(config.trailing_newline);
494 assert!(!config.pretty);
495 assert!(config.content_type.is_none());
496 }
497
498 #[test]
499 fn ndjson_config_custom() {
500 let config = NdjsonConfig::new()
501 .trailing_newline(false)
502 .pretty(true)
503 .content_type(b"application/jsonlines".to_vec());
504
505 assert!(!config.trailing_newline);
506 assert!(config.pretty);
507 assert_eq!(
508 config.get_content_type(),
509 b"application/jsonlines".as_slice()
510 );
511 }
512
513 #[test]
514 fn ndjson_response_sets_content_type() {
515 let items = vec![TestItem {
516 id: 1,
517 name: "Test".to_string(),
518 }];
519
520 let stream = asupersync::stream::iter(items);
521 let response = NdjsonResponse::new(stream).into_response();
522
523 let content_type = response
524 .headers()
525 .iter()
526 .find(|(name, _)| name == "Content-Type")
527 .map(|(_, value)| value.clone());
528
529 assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
530 }
531
532 #[test]
533 fn ndjson_response_sets_cache_control() {
534 let items = vec![TestItem {
535 id: 1,
536 name: "Test".to_string(),
537 }];
538
539 let stream = asupersync::stream::iter(items);
540 let response = NdjsonResponse::new(stream).into_response();
541
542 let cache_control = response
543 .headers()
544 .iter()
545 .find(|(name, _)| name == "Cache-Control")
546 .map(|(_, value)| value.clone());
547
548 assert_eq!(cache_control, Some(b"no-cache".to_vec()));
549 }
550
551 #[test]
552 fn ndjson_response_disables_nginx_buffering() {
553 let items = vec![TestItem {
554 id: 1,
555 name: "Test".to_string(),
556 }];
557
558 let stream = asupersync::stream::iter(items);
559 let response = NdjsonResponse::new(stream).into_response();
560
561 let accel_buffering = response
562 .headers()
563 .iter()
564 .find(|(name, _)| name == "X-Accel-Buffering")
565 .map(|(_, value)| value.clone());
566
567 assert_eq!(accel_buffering, Some(b"no".to_vec()));
568 }
569
570 #[test]
571 fn ndjson_response_status_200() {
572 let items: Vec<TestItem> = vec![];
573 let stream = asupersync::stream::iter(items);
574 let response = NdjsonResponse::new(stream).into_response();
575
576 assert_eq!(response.status().as_u16(), 200);
577 }
578
579 #[test]
580 fn ndjson_response_with_custom_content_type() {
581 let items = vec![TestItem {
582 id: 1,
583 name: "Test".to_string(),
584 }];
585
586 let config = NdjsonConfig::new().content_type(b"application/jsonlines".to_vec());
587 let stream = asupersync::stream::iter(items);
588 let response = NdjsonResponse::with_config(stream, config).into_response();
589
590 let content_type = response
591 .headers()
592 .iter()
593 .find(|(name, _)| name == "Content-Type")
594 .map(|(_, value)| value.clone());
595
596 assert_eq!(content_type, Some(b"application/jsonlines".to_vec()));
597 }
598
599 #[test]
600 fn ndjson_helper_function() {
601 let items = vec![TestItem {
602 id: 1,
603 name: "Test".to_string(),
604 }];
605
606 let stream = asupersync::stream::iter(items);
607 let response = ndjson_response(stream);
608
609 assert_eq!(response.status().as_u16(), 200);
610
611 let content_type = response
612 .headers()
613 .iter()
614 .find(|(name, _)| name == "Content-Type")
615 .map(|(_, value)| value.clone());
616
617 assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
618 }
619
620 #[test]
621 fn ndjson_iter_helper() {
622 let items = vec![
623 TestItem {
624 id: 1,
625 name: "Alice".to_string(),
626 },
627 TestItem {
628 id: 2,
629 name: "Bob".to_string(),
630 },
631 ];
632
633 let response = ndjson_iter(items);
634
635 assert_eq!(response.status().as_u16(), 200);
636 }
637
638 #[test]
639 fn ndjson_handles_special_characters() {
640 #[derive(Serialize)]
641 struct SpecialItem {
642 text: String,
643 }
644
645 let items = vec![
646 SpecialItem {
647 text: "Hello\nWorld".to_string(), },
649 SpecialItem {
650 text: "Tab\there".to_string(), },
652 SpecialItem {
653 text: r#"Quote: "test""#.to_string(), },
655 SpecialItem {
656 text: "Unicode: 你好".to_string(), },
658 ];
659
660 let stream = asupersync::stream::iter(items);
661 let mut ndjson = NdjsonStream::<_, SpecialItem>::new(stream);
662
663 let waker = noop_waker();
664 let mut cx = Context::from_waker(&waker);
665
666 loop {
668 match Pin::new(&mut ndjson).poll_next(&mut cx) {
669 Poll::Ready(Some(bytes)) => {
670 let json_str = String::from_utf8_lossy(&bytes);
671 let json_str = json_str.trim_end();
672 let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
673 assert!(
674 parsed.is_ok(),
675 "Line should be valid JSON even with special chars: {}",
676 json_str
677 );
678 }
679 Poll::Ready(None) => break,
680 Poll::Pending => panic!("Unexpected Pending"),
681 }
682 }
683 }
684
685 #[test]
686 fn ndjson_pretty_print() {
687 let items = vec![TestItem {
688 id: 1,
689 name: "Test".to_string(),
690 }];
691
692 let config = NdjsonConfig::new().pretty(true);
693 let stream = asupersync::stream::iter(items);
694 let mut ndjson = NdjsonStream::with_config(stream, config);
695
696 let waker = noop_waker();
697 let mut cx = Context::from_waker(&waker);
698
699 let result = Pin::new(&mut ndjson).poll_next(&mut cx);
700 if let Poll::Ready(Some(bytes)) = result {
701 let line = String::from_utf8_lossy(&bytes);
702 assert!(line.contains('\n'));
704 assert!(line.ends_with('\n'));
706 } else {
707 panic!("Expected Ready(Some(...))");
708 }
709 }
710
711 #[test]
712 fn ndjson_content_type_constant() {
713 assert_eq!(NDJSON_CONTENT_TYPE, b"application/x-ndjson");
714 assert_eq!(NDJSON_CONTENT_TYPE_ALT, b"application/jsonlines");
715 }
716}