datafusion_datasource_json/utils.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility types for JSON processing
19
20use std::io::{BufRead, Read};
21
22use bytes::Bytes;
23
24// ============================================================================
25// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter
26// ============================================================================
27//
28// Architecture:
29//
30// ```text
31// ┌─────────────────────────────────────────────────────────────┐
32// │ JSON Array File (potentially very large, e.g. 33GB) │
33// │ [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}] │
34// └─────────────────────────────────────────────────────────────┘
35// │
36// ▼ read chunks via ChannelReader
37// ┌───────────────────┐
38// │ JsonArrayToNdjson │ ← character substitution only:
39// │ Reader │ '[' skip, ',' → '\n', ']' stop
40// └───────────────────┘
41// │
42// ▼ outputs NDJSON format
43// ┌───────────────────┐
44// │ Arrow Reader │ ← internal buffer, batch parsing
45// │ batch_size=8192 │
46// └───────────────────┘
47// │
48// ▼ outputs RecordBatch
49// ┌───────────────────┐
50// │ RecordBatch │
51// └───────────────────┘
52// ```
53//
54// Memory Efficiency:
55//
56// | Approach | Memory for 33GB file | Parse count |
57// |---------------------------------------|----------------------|-------------|
58// | Load entire file + serde_json | ~100GB+ | 3x |
59// | Streaming with JsonArrayToNdjsonReader| ~32MB (configurable) | 1x |
60//
61// Design Note:
62//
63// This implementation uses `inner: R` directly (not `BufReader<R>`) and manages
64// its own input buffer. This is critical for compatibility with `SyncIoBridge`
65// and `ChannelReader` in `spawn_blocking` contexts.
66//
67
68/// Default buffer size for JsonArrayToNdjsonReader (2MB for better throughput)
69const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024;
70
71/// Parser state for JSON array streaming
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum JsonArrayState {
74 /// Initial state, looking for opening '['
75 Start,
76 /// Inside the JSON array, processing objects
77 InArray,
78 /// Reached the closing ']', finished
79 Done,
80}
81
82/// A streaming reader that converts JSON array format to NDJSON format.
83///
84/// This reader wraps an underlying reader containing JSON array data
85/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited
86/// JSON format that Arrow's JSON reader can process.
87///
88/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's
89/// `ReaderBuilder::build()` which requires `BufRead`.
90///
91/// # Transformation Rules
92///
93/// - Skip leading `[` and whitespace before it
94/// - Convert top-level `,` (between objects) to `\n`
95/// - Skip whitespace at top level (between objects)
96/// - Stop at trailing `]`
97/// - Preserve everything inside objects (including nested `[`, `]`, `,`)
98/// - Properly handle strings (ignore special chars inside quotes)
99///
100/// # Example
101///
102/// ```text
103/// Input: [{"a":1}, {"b":[1,2]}, {"c":"x,y"}]
104/// Output: {"a":1}
105/// {"b":[1,2]}
106/// {"c":"x,y"}
107/// ```
108pub struct JsonArrayToNdjsonReader<R: Read> {
109 /// Inner reader - we use R directly (not `BufReader<R>`) for SyncIoBridge compatibility
110 inner: R,
111 state: JsonArrayState,
112 /// Tracks nesting depth of `{` and `[` to identify top-level commas
113 depth: i32,
114 /// Whether we're currently inside a JSON string
115 in_string: bool,
116 /// Whether the next character is escaped (after `\`)
117 escape_next: bool,
118 /// Input buffer - stores raw bytes read from inner reader
119 input_buffer: Vec<u8>,
120 /// Current read position in input buffer
121 input_pos: usize,
122 /// Number of valid bytes in input buffer
123 input_filled: usize,
124 /// Output buffer - stores transformed NDJSON bytes
125 output_buffer: Vec<u8>,
126 /// Current read position in output buffer
127 output_pos: usize,
128 /// Number of valid bytes in output buffer
129 output_filled: usize,
130 /// Whether trailing non-whitespace content was detected after ']'
131 has_trailing_content: bool,
132 /// Whether leading non-whitespace content was detected before '['
133 has_leading_content: bool,
134}
135
136impl<R: Read> JsonArrayToNdjsonReader<R> {
137 /// Create a new streaming reader that converts JSON array to NDJSON.
138 pub fn new(reader: R) -> Self {
139 Self::with_capacity(reader, DEFAULT_BUF_SIZE)
140 }
141
142 /// Create a new streaming reader with custom buffer size.
143 ///
144 /// Larger buffers improve throughput but use more memory.
145 /// Total memory usage is approximately 2 * capacity (input + output buffers).
146 pub fn with_capacity(reader: R, capacity: usize) -> Self {
147 Self {
148 inner: reader,
149 state: JsonArrayState::Start,
150 depth: 0,
151 in_string: false,
152 escape_next: false,
153 input_buffer: vec![0; capacity],
154 input_pos: 0,
155 input_filled: 0,
156 output_buffer: vec![0; capacity],
157 output_pos: 0,
158 output_filled: 0,
159 has_trailing_content: false,
160 has_leading_content: false,
161 }
162 }
163
164 /// Check if the JSON array was properly terminated.
165 ///
166 /// This should be called after all data has been read.
167 ///
168 /// Returns an error if:
169 /// - Unbalanced braces/brackets (depth != 0)
170 /// - Unterminated string
171 /// - Missing closing `]`
172 /// - Unexpected trailing content after `]`
173 pub fn validate_complete(&self) -> std::io::Result<()> {
174 if self.has_leading_content {
175 return Err(std::io::Error::new(
176 std::io::ErrorKind::InvalidData,
177 "Malformed JSON: unexpected leading content before '['",
178 ));
179 }
180 if self.depth != 0 {
181 return Err(std::io::Error::new(
182 std::io::ErrorKind::InvalidData,
183 "Malformed JSON array: unbalanced braces or brackets",
184 ));
185 }
186 if self.in_string {
187 return Err(std::io::Error::new(
188 std::io::ErrorKind::InvalidData,
189 "Malformed JSON array: unterminated string",
190 ));
191 }
192 if self.state != JsonArrayState::Done {
193 return Err(std::io::Error::new(
194 std::io::ErrorKind::InvalidData,
195 "Incomplete JSON array: expected closing bracket ']'",
196 ));
197 }
198 if self.has_trailing_content {
199 return Err(std::io::Error::new(
200 std::io::ErrorKind::InvalidData,
201 "Malformed JSON: unexpected trailing content after ']'",
202 ));
203 }
204 Ok(())
205 }
206
207 /// Process a single byte and return the transformed byte (if any)
208 #[inline]
209 fn process_byte(&mut self, byte: u8) -> Option<u8> {
210 match self.state {
211 JsonArrayState::Start => {
212 // Looking for the opening '[', skip whitespace
213 if byte == b'[' {
214 self.state = JsonArrayState::InArray;
215 } else if !byte.is_ascii_whitespace() {
216 self.has_leading_content = true;
217 }
218 None
219 }
220 JsonArrayState::InArray => {
221 // Handle escape sequences in strings
222 if self.escape_next {
223 self.escape_next = false;
224 return Some(byte);
225 }
226
227 if self.in_string {
228 // Inside a string: handle escape and closing quote
229 match byte {
230 b'\\' => self.escape_next = true,
231 b'"' => self.in_string = false,
232 _ => {}
233 }
234 Some(byte)
235 } else {
236 // Outside strings: track depth and transform
237 match byte {
238 b'"' => {
239 self.in_string = true;
240 Some(byte)
241 }
242 b'{' | b'[' => {
243 self.depth += 1;
244 Some(byte)
245 }
246 b'}' => {
247 self.depth -= 1;
248 Some(byte)
249 }
250 b']' => {
251 if self.depth == 0 {
252 // Top-level ']' means end of array
253 self.state = JsonArrayState::Done;
254 None
255 } else {
256 // Nested ']' inside an object
257 self.depth -= 1;
258 Some(byte)
259 }
260 }
261 b',' if self.depth == 0 => {
262 // Top-level comma between objects → newline
263 Some(b'\n')
264 }
265 _ => {
266 // At depth 0, skip whitespace between objects
267 if self.depth == 0 && byte.is_ascii_whitespace() {
268 None
269 } else {
270 Some(byte)
271 }
272 }
273 }
274 }
275 }
276 JsonArrayState::Done => {
277 // After ']', check for non-whitespace trailing content
278 if !byte.is_ascii_whitespace() {
279 self.has_trailing_content = true;
280 }
281 None
282 }
283 }
284 }
285
286 /// Refill input buffer from inner reader if needed.
287 /// Returns true if there's data available, false on EOF.
288 fn refill_input_if_needed(&mut self) -> std::io::Result<bool> {
289 if self.input_pos >= self.input_filled {
290 // Input buffer exhausted, read more from inner
291 let bytes_read = self.inner.read(&mut self.input_buffer)?;
292 if bytes_read == 0 {
293 return Ok(false); // EOF
294 }
295 self.input_pos = 0;
296 self.input_filled = bytes_read;
297 }
298 Ok(true)
299 }
300
301 /// Fill the output buffer with transformed data.
302 ///
303 /// This method manages its own input buffer, reading from the inner reader
304 /// as needed. When the output buffer is full, we stop processing but preserve
305 /// the current position in the input buffer for the next call.
306 fn fill_output_buffer(&mut self) -> std::io::Result<()> {
307 let mut write_pos = 0;
308
309 while write_pos < self.output_buffer.len() {
310 // Refill input buffer if exhausted
311 if !self.refill_input_if_needed()? {
312 break; // EOF
313 }
314
315 // Process bytes from input buffer
316 while self.input_pos < self.input_filled
317 && write_pos < self.output_buffer.len()
318 {
319 let byte = self.input_buffer[self.input_pos];
320 self.input_pos += 1;
321
322 if let Some(transformed) = self.process_byte(byte) {
323 self.output_buffer[write_pos] = transformed;
324 write_pos += 1;
325 }
326 }
327 }
328
329 self.output_pos = 0;
330 self.output_filled = write_pos;
331 Ok(())
332 }
333}
334
335impl<R: Read> Read for JsonArrayToNdjsonReader<R> {
336 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
337 // If output buffer is empty, fill it
338 if self.output_pos >= self.output_filled {
339 self.fill_output_buffer()?;
340 if self.output_filled == 0 {
341 return Ok(0); // EOF
342 }
343 }
344
345 // Copy from output buffer to caller's buffer
346 let available = self.output_filled - self.output_pos;
347 let to_copy = std::cmp::min(available, buf.len());
348 buf[..to_copy].copy_from_slice(
349 &self.output_buffer[self.output_pos..self.output_pos + to_copy],
350 );
351 self.output_pos += to_copy;
352 Ok(to_copy)
353 }
354}
355
356impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
357 fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
358 if self.output_pos >= self.output_filled {
359 self.fill_output_buffer()?;
360 }
361 Ok(&self.output_buffer[self.output_pos..self.output_filled])
362 }
363
364 fn consume(&mut self, amt: usize) {
365 self.output_pos = std::cmp::min(self.output_pos + amt, self.output_filled);
366 }
367}
368
369// ============================================================================
370// ChannelReader - Sync reader that receives bytes from async channel
371// ============================================================================
372//
373// Architecture:
374//
375// ```text
376// ┌─────────────────────────────────────────────────────────────────────────┐
377// │ S3 / MinIO (async) │
378// │ (33GB JSON Array File) │
379// └─────────────────────────────────────────────────────────────────────────┘
380// │
381// ▼ async stream (Bytes chunks)
382// ┌─────────────────────────────────────────────────────────────────────────┐
383// │ Async Task (tokio runtime) │
384// │ while let Some(chunk) = stream.next().await │
385// │ byte_tx.send(chunk) │
386// └─────────────────────────────────────────────────────────────────────────┘
387// │
388// ▼ tokio::sync::mpsc::channel<Bytes>
389// │ (bounded, ~32MB buffer)
390// ▼
391// ┌─────────────────────────────────────────────────────────────────────────┐
392// │ Blocking Task (spawn_blocking) │
393// │ ┌──────────────┐ ┌────────────────────────┐ ┌──────────────────┐ │
394// │ │ChannelReader │ → │JsonArrayToNdjsonReader │ → │ Arrow JsonReader │ │
395// │ │ (Read) │ │ [{},...] → {}\n{} │ │ (RecordBatch) │ │
396// │ └──────────────┘ └────────────────────────┘ └──────────────────┘ │
397// └─────────────────────────────────────────────────────────────────────────┘
398// │
399// ▼ tokio::sync::mpsc::channel<RecordBatch>
400// ┌─────────────────────────────────────────────────────────────────────────┐
401// │ ReceiverStream (async) │
402// │ → DataFusion execution engine │
403// └─────────────────────────────────────────────────────────────────────────┘
404// ```
405//
406// Memory Budget (~32MB total):
407// - sync_channel buffer: 128 chunks × ~128KB = ~16MB
408// - JsonArrayToNdjsonReader: 2 × 2MB = 4MB
409// - Arrow JsonReader internal: ~8MB
410// - Miscellaneous: ~4MB
411//
412
413/// A synchronous `Read` implementation that receives bytes from an async channel.
414///
415/// This enables true streaming between async and sync contexts without
416/// loading the entire file into memory. Uses `tokio::sync::mpsc::Receiver`
417/// with `blocking_recv()` so the async producer never blocks a tokio worker
418/// thread, while the sync consumer (running in `spawn_blocking`) safely blocks.
419pub struct ChannelReader {
420 rx: tokio::sync::mpsc::Receiver<Bytes>,
421 current: Option<Bytes>,
422 pos: usize,
423}
424
425impl ChannelReader {
426 /// Create a new ChannelReader from a tokio mpsc receiver.
427 pub fn new(rx: tokio::sync::mpsc::Receiver<Bytes>) -> Self {
428 Self {
429 rx,
430 current: None,
431 pos: 0,
432 }
433 }
434}
435
436impl Read for ChannelReader {
437 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
438 loop {
439 // If we have current chunk with remaining data, read from it
440 if let Some(ref chunk) = self.current {
441 let remaining = chunk.len() - self.pos;
442 if remaining > 0 {
443 let to_copy = std::cmp::min(remaining, buf.len());
444 buf[..to_copy].copy_from_slice(&chunk[self.pos..self.pos + to_copy]);
445 self.pos += to_copy;
446 return Ok(to_copy);
447 }
448 }
449
450 // Current chunk exhausted, get next from channel
451 match self.rx.blocking_recv() {
452 Some(bytes) => {
453 self.current = Some(bytes);
454 self.pos = 0;
455 // Loop back to read from new chunk
456 }
457 None => return Ok(0), // Channel closed = EOF
458 }
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466
467 #[test]
468 fn test_json_array_to_ndjson_simple() {
469 let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#;
470 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
471 let mut output = String::new();
472 reader.read_to_string(&mut output).unwrap();
473 assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}");
474 }
475
476 #[test]
477 fn test_json_array_to_ndjson_nested() {
478 let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#;
479 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
480 let mut output = String::new();
481 reader.read_to_string(&mut output).unwrap();
482 assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}");
483 }
484
485 #[test]
486 fn test_json_array_to_ndjson_strings_with_special_chars() {
487 let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#;
488 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
489 let mut output = String::new();
490 reader.read_to_string(&mut output).unwrap();
491 assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}");
492 }
493
494 #[test]
495 fn test_json_array_to_ndjson_escaped_quotes() {
496 let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#;
497 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
498 let mut output = String::new();
499 reader.read_to_string(&mut output).unwrap();
500 assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}");
501 }
502
503 #[test]
504 fn test_json_array_to_ndjson_empty() {
505 let input = r#"[]"#;
506 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
507 let mut output = String::new();
508 reader.read_to_string(&mut output).unwrap();
509 assert_eq!(output, "");
510 }
511
512 #[test]
513 fn test_json_array_to_ndjson_single_element() {
514 let input = r#"[{"a":1}]"#;
515 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
516 let mut output = String::new();
517 reader.read_to_string(&mut output).unwrap();
518 assert_eq!(output, "{\"a\":1}");
519 }
520
521 #[test]
522 fn test_json_array_to_ndjson_bufread() {
523 let input = r#"[{"a":1}, {"a":2}]"#;
524 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
525
526 let buf = reader.fill_buf().unwrap();
527 assert!(!buf.is_empty());
528
529 let first_len = buf.len();
530 reader.consume(first_len);
531
532 let mut output = String::new();
533 reader.read_to_string(&mut output).unwrap();
534 }
535
536 #[test]
537 fn test_json_array_to_ndjson_whitespace() {
538 let input = r#" [ {"a":1} , {"a":2} ] "#;
539 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
540 let mut output = String::new();
541 reader.read_to_string(&mut output).unwrap();
542 // Top-level whitespace is skipped, internal whitespace preserved
543 assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
544 }
545
546 #[test]
547 fn test_validate_complete_valid_json() {
548 let valid_json = r#"[{"a":1},{"a":2}]"#;
549 let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes());
550 let mut output = String::new();
551 reader.read_to_string(&mut output).unwrap();
552 reader.validate_complete().unwrap();
553 }
554
555 #[test]
556 fn test_json_array_with_trailing_junk() {
557 let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#;
558 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
559 let mut output = String::new();
560 reader.read_to_string(&mut output).unwrap();
561
562 // Should extract the valid array content
563 assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
564
565 // But validation should catch the trailing junk
566 let result = reader.validate_complete();
567 assert!(result.is_err());
568 let err_msg = result.unwrap_err().to_string();
569 assert!(
570 err_msg.contains("trailing content")
571 || err_msg.contains("Unexpected trailing"),
572 "Expected trailing content error, got: {err_msg}"
573 );
574 }
575
576 #[test]
577 fn test_validate_complete_incomplete_array() {
578 let invalid_json = r#"[{"a":1},{"a":2}"#; // Missing closing ]
579 let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
580 let mut output = String::new();
581 reader.read_to_string(&mut output).unwrap();
582
583 let result = reader.validate_complete();
584 assert!(result.is_err());
585 let err_msg = result.unwrap_err().to_string();
586 assert!(
587 err_msg.contains("expected closing bracket")
588 || err_msg.contains("missing closing"),
589 "Expected missing bracket error, got: {err_msg}"
590 );
591 }
592
593 #[test]
594 fn test_validate_complete_unbalanced_braces() {
595 let invalid_json = r#"[{"a":1},{"a":2]"#; // Wrong closing bracket
596 let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
597 let mut output = String::new();
598 reader.read_to_string(&mut output).unwrap();
599
600 let result = reader.validate_complete();
601 assert!(result.is_err());
602 let err_msg = result.unwrap_err().to_string();
603 assert!(
604 err_msg.contains("unbalanced")
605 || err_msg.contains("expected closing bracket"),
606 "Expected unbalanced or missing bracket error, got: {err_msg}"
607 );
608 }
609
610 #[test]
611 fn test_json_array_with_leading_junk() {
612 let input = r#"junk[{"a":1}, {"a":2}]"#;
613 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
614 let mut output = String::new();
615 reader.read_to_string(&mut output).unwrap();
616
617 // Should still extract the valid array content
618 assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
619
620 // But validation should catch the leading junk
621 let result = reader.validate_complete();
622 assert!(result.is_err());
623 let err_msg = result.unwrap_err().to_string();
624 assert!(
625 err_msg.contains("leading content"),
626 "Expected leading content error, got: {err_msg}"
627 );
628 }
629
630 #[test]
631 fn test_json_array_with_leading_whitespace_ok() {
632 let input = r#"
633 [{"a":1}, {"a":2}]"#;
634 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
635 let mut output = String::new();
636 reader.read_to_string(&mut output).unwrap();
637 assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
638
639 // Leading whitespace should be fine
640 reader.validate_complete().unwrap();
641 }
642
643 #[test]
644 fn test_validate_complete_valid_with_trailing_whitespace() {
645 let input = r#"[{"a":1},{"a":2}]
646 "#; // Trailing whitespace is OK
647 let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
648 let mut output = String::new();
649 reader.read_to_string(&mut output).unwrap();
650
651 // Whitespace after ] should be allowed
652 reader.validate_complete().unwrap();
653 }
654
655 /// Test that data is not lost at buffer boundaries.
656 ///
657 /// This test creates input larger than the internal buffer to verify
658 /// that newline characters are not dropped when they occur at buffer boundaries.
659 #[test]
660 fn test_buffer_boundary_no_data_loss() {
661 // Create objects ~9KB each, so 10 objects = ~90KB
662 let large_value = "x".repeat(9000);
663
664 let mut objects = vec![];
665 for i in 0..10 {
666 objects.push(format!(r#"{{"id":{i},"data":"{large_value}"}}"#));
667 }
668
669 let input = format!("[{}]", objects.join(","));
670
671 // Use small buffer to force multiple fill cycles
672 let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
673 let mut output = String::new();
674 reader.read_to_string(&mut output).unwrap();
675
676 // Verify correct number of newlines (9 newlines separate 10 objects)
677 let newline_count = output.matches('\n').count();
678 assert_eq!(
679 newline_count, 9,
680 "Expected 9 newlines separating 10 objects, got {newline_count}"
681 );
682
683 // Verify each line is valid JSON
684 for (i, line) in output.lines().enumerate() {
685 let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
686 assert!(
687 parsed.is_ok(),
688 "Line {} is not valid JSON: {}...",
689 i,
690 &line[..100.min(line.len())]
691 );
692
693 // Verify the id field matches expected value
694 let value = parsed.unwrap();
695 assert_eq!(
696 value["id"].as_i64(),
697 Some(i as i64),
698 "Object {i} has wrong id"
699 );
700 }
701 }
702
703 /// Test with real-world-like data format (with leading whitespace and newlines)
704 #[test]
705 fn test_real_world_format_large() {
706 let large_value = "x".repeat(8000);
707
708 // Format similar to real files: opening bracket on its own line,
709 // each object indented with 2 spaces
710 let mut objects = vec![];
711 for i in 0..10 {
712 objects.push(format!(r#" {{"id":{i},"data":"{large_value}"}}"#));
713 }
714
715 let input = format!("[\n{}\n]", objects.join(",\n"));
716
717 let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
718 let mut output = String::new();
719 reader.read_to_string(&mut output).unwrap();
720
721 let lines: Vec<&str> = output.lines().collect();
722 assert_eq!(lines.len(), 10, "Expected 10 objects");
723
724 for (i, line) in lines.iter().enumerate() {
725 assert!(
726 line.starts_with("{\"id\""),
727 "Line {} should start with object, got: {}...",
728 i,
729 &line[..50.min(line.len())]
730 );
731 }
732 }
733
734 /// Test ChannelReader
735 #[test]
736 fn test_channel_reader() {
737 let (tx, rx) = tokio::sync::mpsc::channel(4);
738
739 // Send some chunks (try_send is non-async)
740 tx.try_send(Bytes::from("Hello, ")).unwrap();
741 tx.try_send(Bytes::from("World!")).unwrap();
742 drop(tx); // Close channel
743
744 let mut reader = ChannelReader::new(rx);
745 let mut output = String::new();
746 reader.read_to_string(&mut output).unwrap();
747
748 assert_eq!(output, "Hello, World!");
749 }
750
751 /// Test ChannelReader with small reads
752 #[test]
753 fn test_channel_reader_small_reads() {
754 let (tx, rx) = tokio::sync::mpsc::channel(4);
755
756 tx.try_send(Bytes::from("ABCDEFGHIJ")).unwrap();
757 drop(tx);
758
759 let mut reader = ChannelReader::new(rx);
760 let mut buf = [0u8; 3];
761
762 // Read in small chunks
763 assert_eq!(reader.read(&mut buf).unwrap(), 3);
764 assert_eq!(&buf, b"ABC");
765
766 assert_eq!(reader.read(&mut buf).unwrap(), 3);
767 assert_eq!(&buf, b"DEF");
768
769 assert_eq!(reader.read(&mut buf).unwrap(), 3);
770 assert_eq!(&buf, b"GHI");
771
772 assert_eq!(reader.read(&mut buf).unwrap(), 1);
773 assert_eq!(&buf[..1], b"J");
774
775 // EOF
776 assert_eq!(reader.read(&mut buf).unwrap(), 0);
777 }
778}