use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use asupersync::stream::Stream;
use serde::Serialize;
use crate::response::{Response, ResponseBody, StatusCode};
pub const NDJSON_CONTENT_TYPE: &[u8] = b"application/x-ndjson";
pub const NDJSON_CONTENT_TYPE_ALT: &[u8] = b"application/jsonlines";
#[derive(Debug, Clone)]
pub struct NdjsonConfig {
pub trailing_newline: bool,
pub pretty: bool,
pub content_type: Option<Vec<u8>>,
}
impl Default for NdjsonConfig {
fn default() -> Self {
Self {
trailing_newline: true,
pretty: false,
content_type: None,
}
}
}
impl NdjsonConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn trailing_newline(mut self, enabled: bool) -> Self {
self.trailing_newline = enabled;
self
}
#[must_use]
pub fn pretty(mut self, enabled: bool) -> Self {
self.pretty = enabled;
self
}
#[must_use]
pub fn content_type(mut self, content_type: impl Into<Vec<u8>>) -> Self {
self.content_type = Some(content_type.into());
self
}
#[must_use]
pub fn get_content_type(&self) -> &[u8] {
self.content_type.as_deref().unwrap_or(NDJSON_CONTENT_TYPE)
}
}
pub struct NdjsonStream<S, T> {
inner: S,
config: NdjsonConfig,
_marker: PhantomData<T>,
}
impl<S, T> NdjsonStream<S, T> {
pub fn new(stream: S) -> Self {
Self {
inner: stream,
config: NdjsonConfig::default(),
_marker: PhantomData,
}
}
pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
Self {
inner: stream,
config,
_marker: PhantomData,
}
}
}
impl<S, T> Stream for NdjsonStream<S, T>
where
S: Stream<Item = T> + Unpin,
T: Serialize + Unpin,
{
type Item = Vec<u8>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
match Pin::new(&mut this.inner).poll_next(cx) {
Poll::Ready(Some(item)) => {
let mut bytes = if this.config.pretty {
match serde_json::to_vec_pretty(&item) {
Ok(b) => b,
Err(e) => {
let error = serde_json::json!({
"error": format!("serialization failed: {}", e)
});
serde_json::to_vec(&error)
.unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
}
}
} else {
match serde_json::to_vec(&item) {
Ok(b) => b,
Err(e) => {
let error = serde_json::json!({
"error": format!("serialization failed: {}", e)
});
serde_json::to_vec(&error)
.unwrap_or_else(|_| br#"{"error":"serialization failed"}"#.to_vec())
}
}
};
bytes.push(b'\n');
Poll::Ready(Some(bytes))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
pub struct NdjsonResponse<S, T> {
stream: S,
config: NdjsonConfig,
_marker: PhantomData<T>,
}
impl<S, T> NdjsonResponse<S, T>
where
S: Stream<Item = T> + Send + Unpin + 'static,
T: Serialize + Send + Unpin + 'static,
{
pub fn new(stream: S) -> Self {
Self {
stream,
config: NdjsonConfig::default(),
_marker: PhantomData,
}
}
pub fn with_config(stream: S, config: NdjsonConfig) -> Self {
Self {
stream,
config,
_marker: PhantomData,
}
}
#[must_use]
pub fn into_response(self) -> Response {
let ndjson_stream = NdjsonStream::with_config(self.stream, self.config.clone());
Response::with_status(StatusCode::OK)
.header("Content-Type", self.config.get_content_type().to_vec())
.header("Cache-Control", b"no-cache".to_vec())
.header("X-Accel-Buffering", b"no".to_vec()) .body(ResponseBody::stream(ndjson_stream))
}
}
pub fn ndjson_response<S, T>(stream: S) -> Response
where
S: Stream<Item = T> + Send + Unpin + 'static,
T: Serialize + Send + Unpin + 'static,
{
NdjsonResponse::new(stream).into_response()
}
pub fn ndjson_iter<I, T>(iter: I) -> Response
where
I: IntoIterator<Item = T>,
I::IntoIter: Send + 'static,
T: Serialize + Send + Unpin + 'static,
{
ndjson_response(asupersync::stream::iter(iter))
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::task::{Wake, Waker};
struct NoopWaker;
impl Wake for NoopWaker {
fn wake(self: Arc<Self>) {}
}
fn noop_waker() -> Waker {
Waker::from(Arc::new(NoopWaker))
}
#[derive(Serialize, Clone)]
struct TestItem {
id: i64,
name: String,
}
#[test]
fn ndjson_stream_serializes_items() {
let items = vec![
TestItem {
id: 1,
name: "Alice".to_string(),
},
TestItem {
id: 2,
name: "Bob".to_string(),
},
];
let stream = asupersync::stream::iter(items);
let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut ndjson).poll_next(&mut cx);
if let Poll::Ready(Some(bytes)) = result {
let line = String::from_utf8_lossy(&bytes);
assert!(line.contains(r#""id":1"#));
assert!(line.contains(r#""name":"Alice""#));
assert!(line.ends_with('\n'));
} else {
panic!("Expected Ready(Some(...))");
}
let result = Pin::new(&mut ndjson).poll_next(&mut cx);
if let Poll::Ready(Some(bytes)) = result {
let line = String::from_utf8_lossy(&bytes);
assert!(line.contains(r#""id":2"#));
assert!(line.contains(r#""name":"Bob""#));
assert!(line.ends_with('\n'));
} else {
panic!("Expected Ready(Some(...))");
}
let result = Pin::new(&mut ndjson).poll_next(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
}
#[test]
fn ndjson_stream_each_line_is_valid_json() {
let items = vec![
TestItem {
id: 1,
name: "Test".to_string(),
},
TestItem {
id: 2,
name: "Item".to_string(),
},
];
let stream = asupersync::stream::iter(items);
let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match Pin::new(&mut ndjson).poll_next(&mut cx) {
Poll::Ready(Some(bytes)) => {
let json_str = String::from_utf8_lossy(&bytes);
let json_str = json_str.trim_end();
let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
assert!(parsed.is_ok(), "Line should be valid JSON: {}", json_str);
}
Poll::Ready(None) => break,
Poll::Pending => panic!("Unexpected Pending"),
}
}
}
#[test]
fn ndjson_stream_empty() {
let items: Vec<TestItem> = vec![];
let stream = asupersync::stream::iter(items);
let mut ndjson = NdjsonStream::<_, TestItem>::new(stream);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut ndjson).poll_next(&mut cx);
assert!(matches!(result, Poll::Ready(None)));
}
#[test]
fn ndjson_config_defaults() {
let config = NdjsonConfig::default();
assert!(config.trailing_newline);
assert!(!config.pretty);
assert!(config.content_type.is_none());
}
#[test]
fn ndjson_config_custom() {
let config = NdjsonConfig::new()
.trailing_newline(false)
.pretty(true)
.content_type(b"application/jsonlines".to_vec());
assert!(!config.trailing_newline);
assert!(config.pretty);
assert_eq!(
config.get_content_type(),
b"application/jsonlines".as_slice()
);
}
#[test]
fn ndjson_response_sets_content_type() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let stream = asupersync::stream::iter(items);
let response = NdjsonResponse::new(stream).into_response();
let content_type = response
.headers()
.iter()
.find(|(name, _)| name == "Content-Type")
.map(|(_, value)| value.clone());
assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
}
#[test]
fn ndjson_response_sets_cache_control() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let stream = asupersync::stream::iter(items);
let response = NdjsonResponse::new(stream).into_response();
let cache_control = response
.headers()
.iter()
.find(|(name, _)| name == "Cache-Control")
.map(|(_, value)| value.clone());
assert_eq!(cache_control, Some(b"no-cache".to_vec()));
}
#[test]
fn ndjson_response_disables_nginx_buffering() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let stream = asupersync::stream::iter(items);
let response = NdjsonResponse::new(stream).into_response();
let accel_buffering = response
.headers()
.iter()
.find(|(name, _)| name == "X-Accel-Buffering")
.map(|(_, value)| value.clone());
assert_eq!(accel_buffering, Some(b"no".to_vec()));
}
#[test]
fn ndjson_response_status_200() {
let items: Vec<TestItem> = vec![];
let stream = asupersync::stream::iter(items);
let response = NdjsonResponse::new(stream).into_response();
assert_eq!(response.status().as_u16(), 200);
}
#[test]
fn ndjson_response_with_custom_content_type() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let config = NdjsonConfig::new().content_type(b"application/jsonlines".to_vec());
let stream = asupersync::stream::iter(items);
let response = NdjsonResponse::with_config(stream, config).into_response();
let content_type = response
.headers()
.iter()
.find(|(name, _)| name == "Content-Type")
.map(|(_, value)| value.clone());
assert_eq!(content_type, Some(b"application/jsonlines".to_vec()));
}
#[test]
fn ndjson_helper_function() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let stream = asupersync::stream::iter(items);
let response = ndjson_response(stream);
assert_eq!(response.status().as_u16(), 200);
let content_type = response
.headers()
.iter()
.find(|(name, _)| name == "Content-Type")
.map(|(_, value)| value.clone());
assert_eq!(content_type, Some(b"application/x-ndjson".to_vec()));
}
#[test]
fn ndjson_iter_helper() {
let items = vec![
TestItem {
id: 1,
name: "Alice".to_string(),
},
TestItem {
id: 2,
name: "Bob".to_string(),
},
];
let response = ndjson_iter(items);
assert_eq!(response.status().as_u16(), 200);
}
#[test]
fn ndjson_handles_special_characters() {
#[derive(Serialize)]
struct SpecialItem {
text: String,
}
let items = vec![
SpecialItem {
text: "Hello\nWorld".to_string(), },
SpecialItem {
text: "Tab\there".to_string(), },
SpecialItem {
text: r#"Quote: "test""#.to_string(), },
SpecialItem {
text: "Unicode: 你好".to_string(), },
];
let stream = asupersync::stream::iter(items);
let mut ndjson = NdjsonStream::<_, SpecialItem>::new(stream);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
loop {
match Pin::new(&mut ndjson).poll_next(&mut cx) {
Poll::Ready(Some(bytes)) => {
let json_str = String::from_utf8_lossy(&bytes);
let json_str = json_str.trim_end();
let parsed: Result<serde_json::Value, _> = serde_json::from_str(json_str);
assert!(
parsed.is_ok(),
"Line should be valid JSON even with special chars: {}",
json_str
);
}
Poll::Ready(None) => break,
Poll::Pending => panic!("Unexpected Pending"),
}
}
}
#[test]
fn ndjson_pretty_print() {
let items = vec![TestItem {
id: 1,
name: "Test".to_string(),
}];
let config = NdjsonConfig::new().pretty(true);
let stream = asupersync::stream::iter(items);
let mut ndjson = NdjsonStream::with_config(stream, config);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = Pin::new(&mut ndjson).poll_next(&mut cx);
if let Poll::Ready(Some(bytes)) = result {
let line = String::from_utf8_lossy(&bytes);
assert!(line.contains('\n'));
assert!(line.ends_with('\n'));
} else {
panic!("Expected Ready(Some(...))");
}
}
#[test]
fn ndjson_content_type_constant() {
assert_eq!(NDJSON_CONTENT_TYPE, b"application/x-ndjson");
assert_eq!(NDJSON_CONTENT_TYPE_ALT, b"application/jsonlines");
}
}