use std::io::{BufRead, Read};
use bytes::Bytes;
const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum JsonArrayState {
Start,
InArray,
Done,
}
pub struct JsonArrayToNdjsonReader<R: Read> {
inner: R,
state: JsonArrayState,
depth: i32,
in_string: bool,
escape_next: bool,
input_buffer: Vec<u8>,
input_pos: usize,
input_filled: usize,
output_buffer: Vec<u8>,
output_pos: usize,
output_filled: usize,
has_trailing_content: bool,
has_leading_content: bool,
}
impl<R: Read> JsonArrayToNdjsonReader<R> {
pub fn new(reader: R) -> Self {
Self::with_capacity(reader, DEFAULT_BUF_SIZE)
}
pub fn with_capacity(reader: R, capacity: usize) -> Self {
Self {
inner: reader,
state: JsonArrayState::Start,
depth: 0,
in_string: false,
escape_next: false,
input_buffer: vec![0; capacity],
input_pos: 0,
input_filled: 0,
output_buffer: vec![0; capacity],
output_pos: 0,
output_filled: 0,
has_trailing_content: false,
has_leading_content: false,
}
}
pub fn validate_complete(&self) -> std::io::Result<()> {
if self.has_leading_content {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON: unexpected leading content before '['",
));
}
if self.depth != 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON array: unbalanced braces or brackets",
));
}
if self.in_string {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON array: unterminated string",
));
}
if self.state != JsonArrayState::Done {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Incomplete JSON array: expected closing bracket ']'",
));
}
if self.has_trailing_content {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON: unexpected trailing content after ']'",
));
}
Ok(())
}
#[inline]
fn process_byte(&mut self, byte: u8) -> Option<u8> {
match self.state {
JsonArrayState::Start => {
if byte == b'[' {
self.state = JsonArrayState::InArray;
} else if !byte.is_ascii_whitespace() {
self.has_leading_content = true;
}
None
}
JsonArrayState::InArray => {
if self.escape_next {
self.escape_next = false;
return Some(byte);
}
if self.in_string {
match byte {
b'\\' => self.escape_next = true,
b'"' => self.in_string = false,
_ => {}
}
Some(byte)
} else {
match byte {
b'"' => {
self.in_string = true;
Some(byte)
}
b'{' | b'[' => {
self.depth += 1;
Some(byte)
}
b'}' => {
self.depth -= 1;
Some(byte)
}
b']' => {
if self.depth == 0 {
self.state = JsonArrayState::Done;
None
} else {
self.depth -= 1;
Some(byte)
}
}
b',' if self.depth == 0 => {
Some(b'\n')
}
_ => {
if self.depth == 0 && byte.is_ascii_whitespace() {
None
} else {
Some(byte)
}
}
}
}
}
JsonArrayState::Done => {
if !byte.is_ascii_whitespace() {
self.has_trailing_content = true;
}
None
}
}
}
fn refill_input_if_needed(&mut self) -> std::io::Result<bool> {
if self.input_pos >= self.input_filled {
let bytes_read = self.inner.read(&mut self.input_buffer)?;
if bytes_read == 0 {
return Ok(false); }
self.input_pos = 0;
self.input_filled = bytes_read;
}
Ok(true)
}
fn fill_output_buffer(&mut self) -> std::io::Result<()> {
let mut write_pos = 0;
while write_pos < self.output_buffer.len() {
if !self.refill_input_if_needed()? {
break; }
while self.input_pos < self.input_filled
&& write_pos < self.output_buffer.len()
{
let byte = self.input_buffer[self.input_pos];
self.input_pos += 1;
if let Some(transformed) = self.process_byte(byte) {
self.output_buffer[write_pos] = transformed;
write_pos += 1;
}
}
}
self.output_pos = 0;
self.output_filled = write_pos;
Ok(())
}
}
impl<R: Read> Read for JsonArrayToNdjsonReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.output_pos >= self.output_filled {
self.fill_output_buffer()?;
if self.output_filled == 0 {
return Ok(0); }
}
let available = self.output_filled - self.output_pos;
let to_copy = std::cmp::min(available, buf.len());
buf[..to_copy].copy_from_slice(
&self.output_buffer[self.output_pos..self.output_pos + to_copy],
);
self.output_pos += to_copy;
Ok(to_copy)
}
}
impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
if self.output_pos >= self.output_filled {
self.fill_output_buffer()?;
}
Ok(&self.output_buffer[self.output_pos..self.output_filled])
}
fn consume(&mut self, amt: usize) {
self.output_pos = std::cmp::min(self.output_pos + amt, self.output_filled);
}
}
pub struct ChannelReader {
rx: tokio::sync::mpsc::Receiver<Bytes>,
current: Option<Bytes>,
pos: usize,
}
impl ChannelReader {
pub fn new(rx: tokio::sync::mpsc::Receiver<Bytes>) -> Self {
Self {
rx,
current: None,
pos: 0,
}
}
}
impl Read for ChannelReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
loop {
if let Some(ref chunk) = self.current {
let remaining = chunk.len() - self.pos;
if remaining > 0 {
let to_copy = std::cmp::min(remaining, buf.len());
buf[..to_copy].copy_from_slice(&chunk[self.pos..self.pos + to_copy]);
self.pos += to_copy;
return Ok(to_copy);
}
}
match self.rx.blocking_recv() {
Some(bytes) => {
self.current = Some(bytes);
self.pos = 0;
}
None => return Ok(0), }
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_json_array_to_ndjson_simple() {
let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}");
}
#[test]
fn test_json_array_to_ndjson_nested() {
let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}");
}
#[test]
fn test_json_array_to_ndjson_strings_with_special_chars() {
let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}");
}
#[test]
fn test_json_array_to_ndjson_escaped_quotes() {
let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}");
}
#[test]
fn test_json_array_to_ndjson_empty() {
let input = r#"[]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "");
}
#[test]
fn test_json_array_to_ndjson_single_element() {
let input = r#"[{"a":1}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}");
}
#[test]
fn test_json_array_to_ndjson_bufread() {
let input = r#"[{"a":1}, {"a":2}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let buf = reader.fill_buf().unwrap();
assert!(!buf.is_empty());
let first_len = buf.len();
reader.consume(first_len);
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
}
#[test]
fn test_json_array_to_ndjson_whitespace() {
let input = r#" [ {"a":1} , {"a":2} ] "#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
}
#[test]
fn test_validate_complete_valid_json() {
let valid_json = r#"[{"a":1},{"a":2}]"#;
let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
reader.validate_complete().unwrap();
}
#[test]
fn test_json_array_with_trailing_junk() {
let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
let result = reader.validate_complete();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("trailing content")
|| err_msg.contains("Unexpected trailing"),
"Expected trailing content error, got: {err_msg}"
);
}
#[test]
fn test_validate_complete_incomplete_array() {
let invalid_json = r#"[{"a":1},{"a":2}"#; let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
let result = reader.validate_complete();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("expected closing bracket")
|| err_msg.contains("missing closing"),
"Expected missing bracket error, got: {err_msg}"
);
}
#[test]
fn test_validate_complete_unbalanced_braces() {
let invalid_json = r#"[{"a":1},{"a":2]"#; let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
let result = reader.validate_complete();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("unbalanced")
|| err_msg.contains("expected closing bracket"),
"Expected unbalanced or missing bracket error, got: {err_msg}"
);
}
#[test]
fn test_json_array_with_leading_junk() {
let input = r#"junk[{"a":1}, {"a":2}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
let result = reader.validate_complete();
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("leading content"),
"Expected leading content error, got: {err_msg}"
);
}
#[test]
fn test_json_array_with_leading_whitespace_ok() {
let input = r#"
[{"a":1}, {"a":2}]"#;
let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
reader.validate_complete().unwrap();
}
#[test]
fn test_validate_complete_valid_with_trailing_whitespace() {
let input = r#"[{"a":1},{"a":2}]
"#; let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
reader.validate_complete().unwrap();
}
#[test]
fn test_buffer_boundary_no_data_loss() {
let large_value = "x".repeat(9000);
let mut objects = vec![];
for i in 0..10 {
objects.push(format!(r#"{{"id":{i},"data":"{large_value}"}}"#));
}
let input = format!("[{}]", objects.join(","));
let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
let newline_count = output.matches('\n').count();
assert_eq!(
newline_count, 9,
"Expected 9 newlines separating 10 objects, got {newline_count}"
);
for (i, line) in output.lines().enumerate() {
let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
assert!(
parsed.is_ok(),
"Line {} is not valid JSON: {}...",
i,
&line[..100.min(line.len())]
);
let value = parsed.unwrap();
assert_eq!(
value["id"].as_i64(),
Some(i as i64),
"Object {i} has wrong id"
);
}
}
#[test]
fn test_real_world_format_large() {
let large_value = "x".repeat(8000);
let mut objects = vec![];
for i in 0..10 {
objects.push(format!(r#" {{"id":{i},"data":"{large_value}"}}"#));
}
let input = format!("[\n{}\n]", objects.join(",\n"));
let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
let lines: Vec<&str> = output.lines().collect();
assert_eq!(lines.len(), 10, "Expected 10 objects");
for (i, line) in lines.iter().enumerate() {
assert!(
line.starts_with("{\"id\""),
"Line {} should start with object, got: {}...",
i,
&line[..50.min(line.len())]
);
}
}
#[test]
fn test_channel_reader() {
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.try_send(Bytes::from("Hello, ")).unwrap();
tx.try_send(Bytes::from("World!")).unwrap();
drop(tx);
let mut reader = ChannelReader::new(rx);
let mut output = String::new();
reader.read_to_string(&mut output).unwrap();
assert_eq!(output, "Hello, World!");
}
#[test]
fn test_channel_reader_small_reads() {
let (tx, rx) = tokio::sync::mpsc::channel(4);
tx.try_send(Bytes::from("ABCDEFGHIJ")).unwrap();
drop(tx);
let mut reader = ChannelReader::new(rx);
let mut buf = [0u8; 3];
assert_eq!(reader.read(&mut buf).unwrap(), 3);
assert_eq!(&buf, b"ABC");
assert_eq!(reader.read(&mut buf).unwrap(), 3);
assert_eq!(&buf, b"DEF");
assert_eq!(reader.read(&mut buf).unwrap(), 3);
assert_eq!(&buf, b"GHI");
assert_eq!(reader.read(&mut buf).unwrap(), 1);
assert_eq!(&buf[..1], b"J");
assert_eq!(reader.read(&mut buf).unwrap(), 0);
}
}