Skip to main content

datafusion_datasource_json/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utility types for JSON processing
19
20use std::io::{BufRead, Read};
21
22use bytes::Bytes;
23
24// ============================================================================
25// JsonArrayToNdjsonReader - Streaming JSON Array to NDJSON Converter
26// ============================================================================
27//
28// Architecture:
29//
30// ```text
31// ┌─────────────────────────────────────────────────────────────┐
32// │  JSON Array File (potentially very large, e.g. 33GB)       │
33// │  [{"a":1}, {"a":2}, {"a":3}, ...... {"a":1000000}]         │
34// └─────────────────────────────────────────────────────────────┘
35//                           │
36//                           ▼ read chunks via ChannelReader
37//                 ┌───────────────────┐
38//                 │ JsonArrayToNdjson │  ← character substitution only:
39//                 │      Reader       │    '[' skip, ',' → '\n', ']' stop
40//                 └───────────────────┘
41//                           │
42//                           ▼ outputs NDJSON format
43//                 ┌───────────────────┐
44//                 │   Arrow Reader    │  ← internal buffer, batch parsing
45//                 │  batch_size=8192  │
46//                 └───────────────────┘
47//                           │
48//                           ▼ outputs RecordBatch
49//                 ┌───────────────────┐
50//                 │   RecordBatch     │
51//                 └───────────────────┘
52// ```
53//
54// Memory Efficiency:
55//
56// | Approach                              | Memory for 33GB file | Parse count |
57// |---------------------------------------|----------------------|-------------|
58// | Load entire file + serde_json         | ~100GB+              | 3x          |
59// | Streaming with JsonArrayToNdjsonReader| ~32MB (configurable) | 1x          |
60//
61// Design Note:
62//
63// This implementation uses `inner: R` directly (not `BufReader<R>`) and manages
64// its own input buffer. This is critical for compatibility with `SyncIoBridge`
65// and `ChannelReader` in `spawn_blocking` contexts.
66//
67
68/// Default buffer size for JsonArrayToNdjsonReader (2MB for better throughput)
69const DEFAULT_BUF_SIZE: usize = 2 * 1024 * 1024;
70
71/// Parser state for JSON array streaming
72#[derive(Debug, Clone, Copy, PartialEq, Eq)]
73enum JsonArrayState {
74    /// Initial state, looking for opening '['
75    Start,
76    /// Inside the JSON array, processing objects
77    InArray,
78    /// Reached the closing ']', finished
79    Done,
80}
81
82/// A streaming reader that converts JSON array format to NDJSON format.
83///
84/// This reader wraps an underlying reader containing JSON array data
85/// `[{...}, {...}, ...]` and transforms it on-the-fly to newline-delimited
86/// JSON format that Arrow's JSON reader can process.
87///
88/// Implements both `Read` and `BufRead` traits for compatibility with Arrow's
89/// `ReaderBuilder::build()` which requires `BufRead`.
90///
91/// # Transformation Rules
92///
93/// - Skip leading `[` and whitespace before it
94/// - Convert top-level `,` (between objects) to `\n`
95/// - Skip whitespace at top level (between objects)
96/// - Stop at trailing `]`
97/// - Preserve everything inside objects (including nested `[`, `]`, `,`)
98/// - Properly handle strings (ignore special chars inside quotes)
99///
100/// # Example
101///
102/// ```text
103/// Input:  [{"a":1}, {"b":[1,2]}, {"c":"x,y"}]
104/// Output: {"a":1}
105///         {"b":[1,2]}
106///         {"c":"x,y"}
107/// ```
108pub struct JsonArrayToNdjsonReader<R: Read> {
109    /// Inner reader - we use R directly (not `BufReader<R>`) for SyncIoBridge compatibility
110    inner: R,
111    state: JsonArrayState,
112    /// Tracks nesting depth of `{` and `[` to identify top-level commas
113    depth: i32,
114    /// Whether we're currently inside a JSON string
115    in_string: bool,
116    /// Whether the next character is escaped (after `\`)
117    escape_next: bool,
118    /// Input buffer - stores raw bytes read from inner reader
119    input_buffer: Vec<u8>,
120    /// Current read position in input buffer
121    input_pos: usize,
122    /// Number of valid bytes in input buffer
123    input_filled: usize,
124    /// Output buffer - stores transformed NDJSON bytes
125    output_buffer: Vec<u8>,
126    /// Current read position in output buffer
127    output_pos: usize,
128    /// Number of valid bytes in output buffer
129    output_filled: usize,
130    /// Whether trailing non-whitespace content was detected after ']'
131    has_trailing_content: bool,
132    /// Whether leading non-whitespace content was detected before '['
133    has_leading_content: bool,
134}
135
136impl<R: Read> JsonArrayToNdjsonReader<R> {
137    /// Create a new streaming reader that converts JSON array to NDJSON.
138    pub fn new(reader: R) -> Self {
139        Self::with_capacity(reader, DEFAULT_BUF_SIZE)
140    }
141
142    /// Create a new streaming reader with custom buffer size.
143    ///
144    /// Larger buffers improve throughput but use more memory.
145    /// Total memory usage is approximately 2 * capacity (input + output buffers).
146    pub fn with_capacity(reader: R, capacity: usize) -> Self {
147        Self {
148            inner: reader,
149            state: JsonArrayState::Start,
150            depth: 0,
151            in_string: false,
152            escape_next: false,
153            input_buffer: vec![0; capacity],
154            input_pos: 0,
155            input_filled: 0,
156            output_buffer: vec![0; capacity],
157            output_pos: 0,
158            output_filled: 0,
159            has_trailing_content: false,
160            has_leading_content: false,
161        }
162    }
163
164    /// Check if the JSON array was properly terminated.
165    ///
166    /// This should be called after all data has been read.
167    ///
168    /// Returns an error if:
169    /// - Unbalanced braces/brackets (depth != 0)
170    /// - Unterminated string
171    /// - Missing closing `]`
172    /// - Unexpected trailing content after `]`
173    pub fn validate_complete(&self) -> std::io::Result<()> {
174        if self.has_leading_content {
175            return Err(std::io::Error::new(
176                std::io::ErrorKind::InvalidData,
177                "Malformed JSON: unexpected leading content before '['",
178            ));
179        }
180        if self.depth != 0 {
181            return Err(std::io::Error::new(
182                std::io::ErrorKind::InvalidData,
183                "Malformed JSON array: unbalanced braces or brackets",
184            ));
185        }
186        if self.in_string {
187            return Err(std::io::Error::new(
188                std::io::ErrorKind::InvalidData,
189                "Malformed JSON array: unterminated string",
190            ));
191        }
192        if self.state != JsonArrayState::Done {
193            return Err(std::io::Error::new(
194                std::io::ErrorKind::InvalidData,
195                "Incomplete JSON array: expected closing bracket ']'",
196            ));
197        }
198        if self.has_trailing_content {
199            return Err(std::io::Error::new(
200                std::io::ErrorKind::InvalidData,
201                "Malformed JSON: unexpected trailing content after ']'",
202            ));
203        }
204        Ok(())
205    }
206
207    /// Process a single byte and return the transformed byte (if any)
208    #[inline]
209    fn process_byte(&mut self, byte: u8) -> Option<u8> {
210        match self.state {
211            JsonArrayState::Start => {
212                // Looking for the opening '[', skip whitespace
213                if byte == b'[' {
214                    self.state = JsonArrayState::InArray;
215                } else if !byte.is_ascii_whitespace() {
216                    self.has_leading_content = true;
217                }
218                None
219            }
220            JsonArrayState::InArray => {
221                // Handle escape sequences in strings
222                if self.escape_next {
223                    self.escape_next = false;
224                    return Some(byte);
225                }
226
227                if self.in_string {
228                    // Inside a string: handle escape and closing quote
229                    match byte {
230                        b'\\' => self.escape_next = true,
231                        b'"' => self.in_string = false,
232                        _ => {}
233                    }
234                    Some(byte)
235                } else {
236                    // Outside strings: track depth and transform
237                    match byte {
238                        b'"' => {
239                            self.in_string = true;
240                            Some(byte)
241                        }
242                        b'{' | b'[' => {
243                            self.depth += 1;
244                            Some(byte)
245                        }
246                        b'}' => {
247                            self.depth -= 1;
248                            Some(byte)
249                        }
250                        b']' => {
251                            if self.depth == 0 {
252                                // Top-level ']' means end of array
253                                self.state = JsonArrayState::Done;
254                                None
255                            } else {
256                                // Nested ']' inside an object
257                                self.depth -= 1;
258                                Some(byte)
259                            }
260                        }
261                        b',' if self.depth == 0 => {
262                            // Top-level comma between objects → newline
263                            Some(b'\n')
264                        }
265                        _ => {
266                            // At depth 0, skip whitespace between objects
267                            if self.depth == 0 && byte.is_ascii_whitespace() {
268                                None
269                            } else {
270                                Some(byte)
271                            }
272                        }
273                    }
274                }
275            }
276            JsonArrayState::Done => {
277                // After ']', check for non-whitespace trailing content
278                if !byte.is_ascii_whitespace() {
279                    self.has_trailing_content = true;
280                }
281                None
282            }
283        }
284    }
285
286    /// Refill input buffer from inner reader if needed.
287    /// Returns true if there's data available, false on EOF.
288    fn refill_input_if_needed(&mut self) -> std::io::Result<bool> {
289        if self.input_pos >= self.input_filled {
290            // Input buffer exhausted, read more from inner
291            let bytes_read = self.inner.read(&mut self.input_buffer)?;
292            if bytes_read == 0 {
293                return Ok(false); // EOF
294            }
295            self.input_pos = 0;
296            self.input_filled = bytes_read;
297        }
298        Ok(true)
299    }
300
301    /// Fill the output buffer with transformed data.
302    ///
303    /// This method manages its own input buffer, reading from the inner reader
304    /// as needed. When the output buffer is full, we stop processing but preserve
305    /// the current position in the input buffer for the next call.
306    fn fill_output_buffer(&mut self) -> std::io::Result<()> {
307        let mut write_pos = 0;
308
309        while write_pos < self.output_buffer.len() {
310            // Refill input buffer if exhausted
311            if !self.refill_input_if_needed()? {
312                break; // EOF
313            }
314
315            // Process bytes from input buffer
316            while self.input_pos < self.input_filled
317                && write_pos < self.output_buffer.len()
318            {
319                let byte = self.input_buffer[self.input_pos];
320                self.input_pos += 1;
321
322                if let Some(transformed) = self.process_byte(byte) {
323                    self.output_buffer[write_pos] = transformed;
324                    write_pos += 1;
325                }
326            }
327        }
328
329        self.output_pos = 0;
330        self.output_filled = write_pos;
331        Ok(())
332    }
333}
334
335impl<R: Read> Read for JsonArrayToNdjsonReader<R> {
336    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
337        // If output buffer is empty, fill it
338        if self.output_pos >= self.output_filled {
339            self.fill_output_buffer()?;
340            if self.output_filled == 0 {
341                return Ok(0); // EOF
342            }
343        }
344
345        // Copy from output buffer to caller's buffer
346        let available = self.output_filled - self.output_pos;
347        let to_copy = std::cmp::min(available, buf.len());
348        buf[..to_copy].copy_from_slice(
349            &self.output_buffer[self.output_pos..self.output_pos + to_copy],
350        );
351        self.output_pos += to_copy;
352        Ok(to_copy)
353    }
354}
355
356impl<R: Read> BufRead for JsonArrayToNdjsonReader<R> {
357    fn fill_buf(&mut self) -> std::io::Result<&[u8]> {
358        if self.output_pos >= self.output_filled {
359            self.fill_output_buffer()?;
360        }
361        Ok(&self.output_buffer[self.output_pos..self.output_filled])
362    }
363
364    fn consume(&mut self, amt: usize) {
365        self.output_pos = std::cmp::min(self.output_pos + amt, self.output_filled);
366    }
367}
368
369// ============================================================================
370// ChannelReader - Sync reader that receives bytes from async channel
371// ============================================================================
372//
373// Architecture:
374//
375// ```text
376// ┌─────────────────────────────────────────────────────────────────────────┐
377// │                         S3 / MinIO (async)                              │
378// │                    (33GB JSON Array File)                               │
379// └─────────────────────────────────────────────────────────────────────────┘
380//                                 │
381//                                 ▼ async stream (Bytes chunks)
382// ┌─────────────────────────────────────────────────────────────────────────┐
383// │                      Async Task (tokio runtime)                         │
384// │              while let Some(chunk) = stream.next().await                │
385// │                     byte_tx.send(chunk)                                 │
386// └─────────────────────────────────────────────────────────────────────────┘
387//                                 │
388//                                 ▼ tokio::sync::mpsc::channel<Bytes>
389//                                 │   (bounded, ~32MB buffer)
390//                                 ▼
391// ┌─────────────────────────────────────────────────────────────────────────┐
392// │                   Blocking Task (spawn_blocking)                        │
393// │  ┌──────────────┐   ┌────────────────────────┐   ┌──────────────────┐  │
394// │  │ChannelReader │ → │JsonArrayToNdjsonReader │ → │ Arrow JsonReader │  │
395// │  │   (Read)     │   │  [{},...] → {}\n{}     │   │  (RecordBatch)   │  │
396// │  └──────────────┘   └────────────────────────┘   └──────────────────┘  │
397// └─────────────────────────────────────────────────────────────────────────┘
398//                                 │
399//                                 ▼ tokio::sync::mpsc::channel<RecordBatch>
400// ┌─────────────────────────────────────────────────────────────────────────┐
401// │                      ReceiverStream (async)                             │
402// │                   → DataFusion execution engine                         │
403// └─────────────────────────────────────────────────────────────────────────┘
404// ```
405//
406// Memory Budget (~32MB total):
407// - sync_channel buffer: 128 chunks × ~128KB = ~16MB
408// - JsonArrayToNdjsonReader: 2 × 2MB = 4MB
409// - Arrow JsonReader internal: ~8MB
410// - Miscellaneous: ~4MB
411//
412
413/// A synchronous `Read` implementation that receives bytes from an async channel.
414///
415/// This enables true streaming between async and sync contexts without
416/// loading the entire file into memory. Uses `tokio::sync::mpsc::Receiver`
417/// with `blocking_recv()` so the async producer never blocks a tokio worker
418/// thread, while the sync consumer (running in `spawn_blocking`) safely blocks.
419pub struct ChannelReader {
420    rx: tokio::sync::mpsc::Receiver<Bytes>,
421    current: Option<Bytes>,
422    pos: usize,
423}
424
425impl ChannelReader {
426    /// Create a new ChannelReader from a tokio mpsc receiver.
427    pub fn new(rx: tokio::sync::mpsc::Receiver<Bytes>) -> Self {
428        Self {
429            rx,
430            current: None,
431            pos: 0,
432        }
433    }
434}
435
436impl Read for ChannelReader {
437    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
438        loop {
439            // If we have current chunk with remaining data, read from it
440            if let Some(ref chunk) = self.current {
441                let remaining = chunk.len() - self.pos;
442                if remaining > 0 {
443                    let to_copy = std::cmp::min(remaining, buf.len());
444                    buf[..to_copy].copy_from_slice(&chunk[self.pos..self.pos + to_copy]);
445                    self.pos += to_copy;
446                    return Ok(to_copy);
447                }
448            }
449
450            // Current chunk exhausted, get next from channel
451            match self.rx.blocking_recv() {
452                Some(bytes) => {
453                    self.current = Some(bytes);
454                    self.pos = 0;
455                    // Loop back to read from new chunk
456                }
457                None => return Ok(0), // Channel closed = EOF
458            }
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466
467    #[test]
468    fn test_json_array_to_ndjson_simple() {
469        let input = r#"[{"a":1}, {"a":2}, {"a":3}]"#;
470        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
471        let mut output = String::new();
472        reader.read_to_string(&mut output).unwrap();
473        assert_eq!(output, "{\"a\":1}\n{\"a\":2}\n{\"a\":3}");
474    }
475
476    #[test]
477    fn test_json_array_to_ndjson_nested() {
478        let input = r#"[{"a":{"b":1}}, {"c":[1,2,3]}]"#;
479        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
480        let mut output = String::new();
481        reader.read_to_string(&mut output).unwrap();
482        assert_eq!(output, "{\"a\":{\"b\":1}}\n{\"c\":[1,2,3]}");
483    }
484
485    #[test]
486    fn test_json_array_to_ndjson_strings_with_special_chars() {
487        let input = r#"[{"a":"[1,2]"}, {"b":"x,y"}]"#;
488        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
489        let mut output = String::new();
490        reader.read_to_string(&mut output).unwrap();
491        assert_eq!(output, "{\"a\":\"[1,2]\"}\n{\"b\":\"x,y\"}");
492    }
493
494    #[test]
495    fn test_json_array_to_ndjson_escaped_quotes() {
496        let input = r#"[{"a":"say \"hello\""}, {"b":1}]"#;
497        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
498        let mut output = String::new();
499        reader.read_to_string(&mut output).unwrap();
500        assert_eq!(output, "{\"a\":\"say \\\"hello\\\"\"}\n{\"b\":1}");
501    }
502
503    #[test]
504    fn test_json_array_to_ndjson_empty() {
505        let input = r#"[]"#;
506        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
507        let mut output = String::new();
508        reader.read_to_string(&mut output).unwrap();
509        assert_eq!(output, "");
510    }
511
512    #[test]
513    fn test_json_array_to_ndjson_single_element() {
514        let input = r#"[{"a":1}]"#;
515        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
516        let mut output = String::new();
517        reader.read_to_string(&mut output).unwrap();
518        assert_eq!(output, "{\"a\":1}");
519    }
520
521    #[test]
522    fn test_json_array_to_ndjson_bufread() {
523        let input = r#"[{"a":1}, {"a":2}]"#;
524        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
525
526        let buf = reader.fill_buf().unwrap();
527        assert!(!buf.is_empty());
528
529        let first_len = buf.len();
530        reader.consume(first_len);
531
532        let mut output = String::new();
533        reader.read_to_string(&mut output).unwrap();
534    }
535
536    #[test]
537    fn test_json_array_to_ndjson_whitespace() {
538        let input = r#"  [  {"a":1}  ,  {"a":2}  ]  "#;
539        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
540        let mut output = String::new();
541        reader.read_to_string(&mut output).unwrap();
542        // Top-level whitespace is skipped, internal whitespace preserved
543        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
544    }
545
546    #[test]
547    fn test_validate_complete_valid_json() {
548        let valid_json = r#"[{"a":1},{"a":2}]"#;
549        let mut reader = JsonArrayToNdjsonReader::new(valid_json.as_bytes());
550        let mut output = String::new();
551        reader.read_to_string(&mut output).unwrap();
552        reader.validate_complete().unwrap();
553    }
554
555    #[test]
556    fn test_json_array_with_trailing_junk() {
557        let input = r#" [ {"a":1} , {"a":2} ] some { junk [ here ] "#;
558        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
559        let mut output = String::new();
560        reader.read_to_string(&mut output).unwrap();
561
562        // Should extract the valid array content
563        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
564
565        // But validation should catch the trailing junk
566        let result = reader.validate_complete();
567        assert!(result.is_err());
568        let err_msg = result.unwrap_err().to_string();
569        assert!(
570            err_msg.contains("trailing content")
571                || err_msg.contains("Unexpected trailing"),
572            "Expected trailing content error, got: {err_msg}"
573        );
574    }
575
576    #[test]
577    fn test_validate_complete_incomplete_array() {
578        let invalid_json = r#"[{"a":1},{"a":2}"#; // Missing closing ]
579        let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
580        let mut output = String::new();
581        reader.read_to_string(&mut output).unwrap();
582
583        let result = reader.validate_complete();
584        assert!(result.is_err());
585        let err_msg = result.unwrap_err().to_string();
586        assert!(
587            err_msg.contains("expected closing bracket")
588                || err_msg.contains("missing closing"),
589            "Expected missing bracket error, got: {err_msg}"
590        );
591    }
592
593    #[test]
594    fn test_validate_complete_unbalanced_braces() {
595        let invalid_json = r#"[{"a":1},{"a":2]"#; // Wrong closing bracket
596        let mut reader = JsonArrayToNdjsonReader::new(invalid_json.as_bytes());
597        let mut output = String::new();
598        reader.read_to_string(&mut output).unwrap();
599
600        let result = reader.validate_complete();
601        assert!(result.is_err());
602        let err_msg = result.unwrap_err().to_string();
603        assert!(
604            err_msg.contains("unbalanced")
605                || err_msg.contains("expected closing bracket"),
606            "Expected unbalanced or missing bracket error, got: {err_msg}"
607        );
608    }
609
610    #[test]
611    fn test_json_array_with_leading_junk() {
612        let input = r#"junk[{"a":1}, {"a":2}]"#;
613        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
614        let mut output = String::new();
615        reader.read_to_string(&mut output).unwrap();
616
617        // Should still extract the valid array content
618        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
619
620        // But validation should catch the leading junk
621        let result = reader.validate_complete();
622        assert!(result.is_err());
623        let err_msg = result.unwrap_err().to_string();
624        assert!(
625            err_msg.contains("leading content"),
626            "Expected leading content error, got: {err_msg}"
627        );
628    }
629
630    #[test]
631    fn test_json_array_with_leading_whitespace_ok() {
632        let input = r#"
633  [{"a":1}, {"a":2}]"#;
634        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
635        let mut output = String::new();
636        reader.read_to_string(&mut output).unwrap();
637        assert_eq!(output, "{\"a\":1}\n{\"a\":2}");
638
639        // Leading whitespace should be fine
640        reader.validate_complete().unwrap();
641    }
642
643    #[test]
644    fn test_validate_complete_valid_with_trailing_whitespace() {
645        let input = r#"[{"a":1},{"a":2}]
646    "#; // Trailing whitespace is OK
647        let mut reader = JsonArrayToNdjsonReader::new(input.as_bytes());
648        let mut output = String::new();
649        reader.read_to_string(&mut output).unwrap();
650
651        // Whitespace after ] should be allowed
652        reader.validate_complete().unwrap();
653    }
654
655    /// Test that data is not lost at buffer boundaries.
656    ///
657    /// This test creates input larger than the internal buffer to verify
658    /// that newline characters are not dropped when they occur at buffer boundaries.
659    #[test]
660    fn test_buffer_boundary_no_data_loss() {
661        // Create objects ~9KB each, so 10 objects = ~90KB
662        let large_value = "x".repeat(9000);
663
664        let mut objects = vec![];
665        for i in 0..10 {
666            objects.push(format!(r#"{{"id":{i},"data":"{large_value}"}}"#));
667        }
668
669        let input = format!("[{}]", objects.join(","));
670
671        // Use small buffer to force multiple fill cycles
672        let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
673        let mut output = String::new();
674        reader.read_to_string(&mut output).unwrap();
675
676        // Verify correct number of newlines (9 newlines separate 10 objects)
677        let newline_count = output.matches('\n').count();
678        assert_eq!(
679            newline_count, 9,
680            "Expected 9 newlines separating 10 objects, got {newline_count}"
681        );
682
683        // Verify each line is valid JSON
684        for (i, line) in output.lines().enumerate() {
685            let parsed: Result<serde_json::Value, _> = serde_json::from_str(line);
686            assert!(
687                parsed.is_ok(),
688                "Line {} is not valid JSON: {}...",
689                i,
690                &line[..100.min(line.len())]
691            );
692
693            // Verify the id field matches expected value
694            let value = parsed.unwrap();
695            assert_eq!(
696                value["id"].as_i64(),
697                Some(i as i64),
698                "Object {i} has wrong id"
699            );
700        }
701    }
702
703    /// Test with real-world-like data format (with leading whitespace and newlines)
704    #[test]
705    fn test_real_world_format_large() {
706        let large_value = "x".repeat(8000);
707
708        // Format similar to real files: opening bracket on its own line,
709        // each object indented with 2 spaces
710        let mut objects = vec![];
711        for i in 0..10 {
712            objects.push(format!(r#"  {{"id":{i},"data":"{large_value}"}}"#));
713        }
714
715        let input = format!("[\n{}\n]", objects.join(",\n"));
716
717        let mut reader = JsonArrayToNdjsonReader::with_capacity(input.as_bytes(), 8192);
718        let mut output = String::new();
719        reader.read_to_string(&mut output).unwrap();
720
721        let lines: Vec<&str> = output.lines().collect();
722        assert_eq!(lines.len(), 10, "Expected 10 objects");
723
724        for (i, line) in lines.iter().enumerate() {
725            assert!(
726                line.starts_with("{\"id\""),
727                "Line {} should start with object, got: {}...",
728                i,
729                &line[..50.min(line.len())]
730            );
731        }
732    }
733
734    /// Test ChannelReader
735    #[test]
736    fn test_channel_reader() {
737        let (tx, rx) = tokio::sync::mpsc::channel(4);
738
739        // Send some chunks (try_send is non-async)
740        tx.try_send(Bytes::from("Hello, ")).unwrap();
741        tx.try_send(Bytes::from("World!")).unwrap();
742        drop(tx); // Close channel
743
744        let mut reader = ChannelReader::new(rx);
745        let mut output = String::new();
746        reader.read_to_string(&mut output).unwrap();
747
748        assert_eq!(output, "Hello, World!");
749    }
750
751    /// Test ChannelReader with small reads
752    #[test]
753    fn test_channel_reader_small_reads() {
754        let (tx, rx) = tokio::sync::mpsc::channel(4);
755
756        tx.try_send(Bytes::from("ABCDEFGHIJ")).unwrap();
757        drop(tx);
758
759        let mut reader = ChannelReader::new(rx);
760        let mut buf = [0u8; 3];
761
762        // Read in small chunks
763        assert_eq!(reader.read(&mut buf).unwrap(), 3);
764        assert_eq!(&buf, b"ABC");
765
766        assert_eq!(reader.read(&mut buf).unwrap(), 3);
767        assert_eq!(&buf, b"DEF");
768
769        assert_eq!(reader.read(&mut buf).unwrap(), 3);
770        assert_eq!(&buf, b"GHI");
771
772        assert_eq!(reader.read(&mut buf).unwrap(), 1);
773        assert_eq!(&buf[..1], b"J");
774
775        // EOF
776        assert_eq!(reader.read(&mut buf).unwrap(), 0);
777    }
778}