Skip to main content

hedl_json/
streaming.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming JSON parsing for HEDL
19//!
20//! This module provides memory-efficient streaming parsers for processing
21//! large JSON files without loading the entire document into memory.
22//!
23//! # Features
24//!
25//! - **Incremental Parsing**: Process JSON objects as they arrive
26//! - **JSONL Support**: Parse newline-delimited JSON (JSON Lines)
27//! - **Memory Bounded**: Configurable memory limits for safe streaming
28//! - **Iterator-Based**: Ergonomic Rust iterator interface
29//!
30//! # Examples
31//!
32//! ## Streaming JSON Array
33//!
34//! ```rust
35//! use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
36//! use std::io::Cursor;
37//!
38//! let json = r#"[
39//!     {"id": "1", "name": "Alice"},
40//!     {"id": "2", "name": "Bob"}
41//! ]"#;
42//!
43//! let reader = Cursor::new(json.as_bytes());
44//! let config = StreamConfig::default();
45//! let streamer = JsonArrayStreamer::new(reader, config).unwrap();
46//!
47//! for result in streamer {
48//!     let doc = result.unwrap();
49//!     println!("Parsed document: {:?}", doc);
50//! }
51//! ```
52//!
53//! ## JSONL Streaming
54//!
55//! ```rust
56//! use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
57//! use std::io::Cursor;
58//!
59//! let jsonl = r#"{"id": "1", "name": "Alice"}
60//! {"id": "2", "name": "Bob"}
61//! {"id": "3", "name": "Charlie"}"#;
62//!
63//! let reader = Cursor::new(jsonl.as_bytes());
64//! let config = StreamConfig::default();
65//! let streamer = JsonLinesStreamer::new(reader, config);
66//!
67//! for result in streamer {
68//!     let doc = result.unwrap();
69//!     println!("Parsed document: {:?}", doc);
70//! }
71//! ```
72
73use crate::from_json::{from_json_value_owned, FromJsonConfig, JsonConversionError};
74use hedl_core::Document;
75use serde_json::Value as JsonValue;
76use std::io::{BufRead, BufReader, Read};
77use std::marker::PhantomData;
78
79// Import the Error trait for custom error creation
80use serde::de::Error as _;
81
82/// Configuration for streaming JSON parsing
83///
84/// Controls memory limits and parsing behavior for streaming operations.
85///
86/// # Memory Safety
87///
88/// Streaming parsers process data incrementally to avoid loading entire
89/// files into memory. However, individual objects can still be large.
90/// Configure `max_object_bytes` to limit memory per object.
91///
92/// # Examples
93///
94/// ```text
95/// use hedl_json::streaming::StreamConfig;
96/// use hedl_json::FromJsonConfig;
97///
98/// // Default configuration - suitable for trusted input
99/// let config = StreamConfig::default();
100///
101/// // Conservative configuration for untrusted input
102/// let strict = StreamConfig {
103///     buffer_size: 8 * 1024,           // 8 KB buffer
104///     max_object_bytes: 1024 * 1024,   // 1 MB per object
105///     from_json: FromJsonConfig::builder()
106///         .max_depth(100)
107///         .max_array_size(10_000)
108///         .build(),
109/// };
110///
111/// // High-throughput configuration for large ML datasets
112/// let ml_config = StreamConfig {
113///     buffer_size: 256 * 1024,         // 256 KB buffer
114///     max_object_bytes: 100 * 1024 * 1024, // 100 MB per object
115///     from_json: FromJsonConfig::default(),
116/// };
117/// ```
118#[derive(Debug, Clone)]
119pub struct StreamConfig {
120    /// Size of internal read buffer in bytes (default: 64 KB)
121    ///
122    /// Larger buffers improve throughput for network I/O but use more memory.
123    /// Smaller buffers reduce memory overhead for many concurrent streams.
124    pub buffer_size: usize,
125
126    /// Maximum bytes per JSON object (default: 10 MB)
127    ///
128    /// Prevents memory exhaustion from individual oversized objects.
129    /// Set to `None` to disable (not recommended for untrusted input).
130    pub max_object_bytes: Option<usize>,
131
132    /// Configuration for JSON to HEDL conversion
133    ///
134    /// Controls limits and behavior when converting each parsed JSON
135    /// object to a HEDL document.
136    pub from_json: FromJsonConfig,
137
138    /// Enable efficient size estimation instead of serialization for size checks.
139    /// Default: true
140    pub use_size_estimation: bool,
141
142    /// Enable true streaming for JSON arrays (constant memory usage).
143    /// When true, uses incremental parsing instead of loading entire array.
144    /// Default: true
145    pub true_streaming: bool,
146}
147
148impl Default for StreamConfig {
149    fn default() -> Self {
150        Self {
151            buffer_size: 64 * 1024, // 64 KB - good balance for most use cases
152            max_object_bytes: Some(10 * 1024 * 1024), // 10 MB per object
153            from_json: FromJsonConfig::default(),
154            use_size_estimation: true,
155            true_streaming: true,
156        }
157    }
158}
159
160impl StreamConfig {
161    /// Configuration optimized for large files (GB+)
162    ///
163    /// Uses larger buffers and object limits while maintaining constant memory.
164    #[must_use]
165    pub fn large_file() -> Self {
166        Self {
167            buffer_size: 256 * 1024,                  // 256 KB buffer
168            max_object_bytes: Some(50 * 1024 * 1024), // 50 MB per object
169            from_json: FromJsonConfig::default(),
170            use_size_estimation: true,
171            true_streaming: true,
172        }
173    }
174
175    /// Configuration for memory-constrained environments
176    ///
177    /// Minimizes memory usage at the cost of some throughput.
178    #[must_use]
179    pub fn low_memory() -> Self {
180        Self {
181            buffer_size: 8 * 1024,               // 8 KB buffer
182            max_object_bytes: Some(1024 * 1024), // 1 MB per object
183            from_json: FromJsonConfig::default(),
184            use_size_estimation: true,
185            true_streaming: true,
186        }
187    }
188}
189
190impl StreamConfig {
191    /// Create a new builder for configuring stream parsing
192    ///
193    /// # Examples
194    ///
195    /// ```text
196    /// use hedl_json::streaming::StreamConfig;
197    ///
198    /// let config = StreamConfig::builder()
199    ///     .buffer_size(128 * 1024)
200    ///     .max_object_bytes(50 * 1024 * 1024)
201    ///     .build();
202    /// ```
203    #[must_use]
204    pub fn builder() -> StreamConfigBuilder {
205        StreamConfigBuilder::default()
206    }
207}
208
209/// Builder for `StreamConfig`
210///
211/// Provides ergonomic configuration of streaming behavior.
212#[derive(Debug, Clone)]
213pub struct StreamConfigBuilder {
214    buffer_size: usize,
215    max_object_bytes: Option<usize>,
216    from_json: FromJsonConfig,
217    use_size_estimation: bool,
218    true_streaming: bool,
219}
220
221impl Default for StreamConfigBuilder {
222    fn default() -> Self {
223        Self {
224            buffer_size: 64 * 1024,
225            max_object_bytes: Some(10 * 1024 * 1024),
226            from_json: FromJsonConfig::default(),
227            use_size_estimation: true,
228            true_streaming: true,
229        }
230    }
231}
232
233impl StreamConfigBuilder {
234    /// Set the buffer size in bytes
235    #[must_use]
236    pub fn buffer_size(mut self, size: usize) -> Self {
237        self.buffer_size = size;
238        self
239    }
240
241    /// Set the maximum object size in bytes
242    #[must_use]
243    pub fn max_object_bytes(mut self, limit: usize) -> Self {
244        self.max_object_bytes = Some(limit);
245        self
246    }
247
248    /// Disable object size limit (use with caution)
249    #[must_use]
250    pub fn unlimited_object_size(mut self) -> Self {
251        self.max_object_bytes = None;
252        self
253    }
254
255    /// Set the JSON conversion configuration
256    #[must_use]
257    pub fn from_json_config(mut self, config: FromJsonConfig) -> Self {
258        self.from_json = config;
259        self
260    }
261
262    /// Enable or disable size estimation optimization
263    #[must_use]
264    pub fn use_size_estimation(mut self, enabled: bool) -> Self {
265        self.use_size_estimation = enabled;
266        self
267    }
268
269    /// Enable or disable true streaming (constant memory)
270    #[must_use]
271    pub fn true_streaming(mut self, enabled: bool) -> Self {
272        self.true_streaming = enabled;
273        self
274    }
275
276    /// Build the configuration
277    #[must_use]
278    pub fn build(self) -> StreamConfig {
279        StreamConfig {
280            buffer_size: self.buffer_size,
281            max_object_bytes: self.max_object_bytes,
282            from_json: self.from_json,
283            use_size_estimation: self.use_size_estimation,
284            true_streaming: self.true_streaming,
285        }
286    }
287}
288
289/// Errors that can occur during streaming JSON parsing
290#[derive(Debug, thiserror::Error)]
291pub enum StreamError {
292    /// I/O error while reading input
293    #[error("I/O error: {0}")]
294    Io(#[from] std::io::Error),
295
296    /// JSON parsing error
297    #[error("JSON parse error: {0}")]
298    Json(#[from] serde_json::Error),
299
300    /// JSON to HEDL conversion error
301    #[error("HEDL conversion error: {0}")]
302    Conversion(#[from] JsonConversionError),
303
304    /// Object exceeded size limit
305    #[error("Object size ({0} bytes) exceeds limit ({1} bytes)")]
306    ObjectTooLarge(usize, usize),
307
308    /// Invalid JSONL format
309    #[error("Invalid JSONL: {0}")]
310    InvalidJsonL(String),
311}
312
313/// Streaming parser for JSON arrays
314///
315/// Parses a JSON array incrementally, yielding each element as a HEDL document.
316/// Memory-efficient for processing large arrays without loading entire array.
317///
318/// # Format
319///
320/// Expects a JSON array of objects:
321/// ```json
322/// [
323///   {"id": "1", "name": "Alice"},
324///   {"id": "2", "name": "Bob"}
325/// ]
326/// ```
327///
328/// # Memory Usage
329///
330/// With `true_streaming` enabled (default):
331/// - **Constant**: O(1) memory regardless of array size
332/// - **Buffer**: Configured buffer size (default 64 KB)
333/// - **Per-object**: Limited by `max_object_bytes` (default 10 MB)
334/// - Processes GB+ files with ~15 MB constant memory
335///
336/// # Examples
337///
338/// ```text
339/// use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
340/// use std::fs::File;
341///
342/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
343/// let file = File::open("large_dataset.json")?;
344/// let config = StreamConfig::default();
345/// let streamer = JsonArrayStreamer::new(file, config)?;
346///
347/// let mut count = 0;
348/// for result in streamer {
349///     let doc = result?;
350///     count += 1;
351///     // Process document without loading entire array
352/// }
353/// println!("Processed {} documents", count);
354/// # Ok(())
355/// # }
356/// ```
357pub struct JsonArrayStreamer<R: Read> {
358    /// Internal implementation: either buffered (legacy) or true streaming
359    inner: JsonArrayStreamerInner<R>,
360    config: StreamConfig,
361}
362
363/// Internal implementation of array streaming
364enum JsonArrayStreamerInner<R: Read> {
365    /// Legacy buffered mode (loads entire array)
366    Buffered {
367        array: Vec<JsonValue>,
368        index: usize,
369        _phantom: PhantomData<R>,
370    },
371    /// True streaming mode (constant memory)
372    Streaming(TrueStreamingArrayParser<R>),
373}
374
375impl<R: Read> JsonArrayStreamer<R> {
376    /// Create a new array streamer
377    ///
378    /// # Arguments
379    ///
380    /// * `reader` - Input source (file, network stream, etc.)
381    /// * `config` - Streaming configuration
382    ///
383    /// # Errors
384    ///
385    /// Returns error if the input doesn't start with a JSON array.
386    ///
387    /// # Streaming Modes
388    ///
389    /// When `config.true_streaming` is enabled (default), uses constant memory
390    /// regardless of array size. Disable for legacy buffered mode.
391    ///
392    /// # Examples
393    ///
394    /// ```text
395    /// use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
396    /// use std::io::Cursor;
397    ///
398    /// let json = r#"[{"id": "1"}]"#;
399    /// let reader = Cursor::new(json.as_bytes());
400    /// let config = StreamConfig::default();
401    /// let streamer = JsonArrayStreamer::new(reader, config).unwrap();
402    /// ```
403    pub fn new(reader: R, config: StreamConfig) -> Result<Self, StreamError> {
404        if config.true_streaming {
405            // Use true streaming mode (constant memory)
406            let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
407            let parser = TrueStreamingArrayParser::new(buf_reader)?;
408            Ok(Self {
409                inner: JsonArrayStreamerInner::Streaming(parser),
410                config,
411            })
412        } else {
413            // Legacy buffered mode (loads entire array into memory)
414            let mut reader = reader;
415            let mut json_str = String::new();
416            reader.read_to_string(&mut json_str)?;
417
418            let value: JsonValue = serde_json::from_str(&json_str)?;
419            let array = match value {
420                JsonValue::Array(arr) => arr,
421                _ => {
422                    return Err(StreamError::Json(serde_json::Error::custom(
423                        "Expected JSON array",
424                    )));
425                }
426            };
427
428            Ok(Self {
429                inner: JsonArrayStreamerInner::Buffered {
430                    array,
431                    index: 0,
432                    _phantom: PhantomData,
433                },
434                config,
435            })
436        }
437    }
438}
439
440/// True streaming JSON array parser with O(1) memory usage.
441///
442/// Uses byte-level parsing to extract array elements one at a time
443/// without loading the entire array into memory.
444struct TrueStreamingArrayParser<R: Read> {
445    /// Buffered reader for efficient I/O
446    reader: BufReader<R>,
447    /// Buffer for reading individual JSON values
448    value_buffer: String,
449    /// Current nesting depth (for tracking object/array boundaries)
450    depth: i32,
451    /// Whether we're inside a string literal
452    in_string: bool,
453    /// Whether we've reached the end of the array
454    finished: bool,
455}
456
457impl<R: Read> TrueStreamingArrayParser<R> {
458    /// Create a new true streaming parser
459    fn new(mut reader: BufReader<R>) -> Result<Self, StreamError> {
460        // Skip whitespace and find opening bracket
461        let mut buf = [0u8; 1];
462        loop {
463            if reader.read(&mut buf)? == 0 {
464                return Err(StreamError::Json(serde_json::Error::custom(
465                    "Unexpected end of input, expected JSON array",
466                )));
467            } else {
468                let ch = buf[0];
469                if ch.is_ascii_whitespace() {
470                    continue;
471                }
472                if ch == b'[' {
473                    break;
474                }
475                return Err(StreamError::Json(serde_json::Error::custom(format!(
476                    "Expected '[' at start of JSON array, found '{}'",
477                    ch as char
478                ))));
479            }
480        }
481
482        Ok(Self {
483            reader,
484            value_buffer: String::with_capacity(4096),
485            depth: 0,
486            in_string: false,
487            finished: false,
488        })
489    }
490
491    /// Read the next JSON value from the array
492    fn next_value(&mut self) -> Option<Result<JsonValue, StreamError>> {
493        if self.finished {
494            return None;
495        }
496
497        self.value_buffer.clear();
498        self.depth = 0;
499        self.in_string = false;
500
501        let mut buf = [0u8; 1];
502        let mut prev_char: u8 = 0;
503        let mut value_started = false;
504
505        loop {
506            match self.reader.read(&mut buf) {
507                Ok(0) => {
508                    // EOF
509                    if value_started && self.depth == 0 {
510                        // We have a complete value
511                        break;
512                    }
513                    self.finished = true;
514                    if value_started {
515                        return Some(Err(StreamError::Json(serde_json::Error::custom(
516                            "Unexpected end of input while parsing array element",
517                        ))));
518                    }
519                    return None;
520                }
521                Ok(_) => {
522                    let ch = buf[0];
523
524                    // Handle string state
525                    if self.in_string {
526                        self.value_buffer.push(ch as char);
527                        if ch == b'"' && prev_char != b'\\' {
528                            self.in_string = false;
529                        }
530                        prev_char = ch;
531                        continue;
532                    }
533
534                    // Skip leading whitespace before value
535                    if !value_started && ch.is_ascii_whitespace() {
536                        continue;
537                    }
538
539                    // Check for end of array before value
540                    if !value_started && ch == b']' {
541                        self.finished = true;
542                        return None;
543                    }
544
545                    // Skip comma between elements
546                    if !value_started && ch == b',' {
547                        continue;
548                    }
549
550                    // Start of value
551                    if !value_started {
552                        value_started = true;
553                    }
554
555                    // Track depth for objects and arrays
556                    match ch {
557                        b'{' | b'[' => {
558                            self.depth += 1;
559                            self.value_buffer.push(ch as char);
560                        }
561                        b'}' | b']' => {
562                            self.depth -= 1;
563                            self.value_buffer.push(ch as char);
564                            if self.depth == 0 {
565                                // Complete object/array value
566                                break;
567                            }
568                        }
569                        b'"' => {
570                            self.in_string = true;
571                            self.value_buffer.push(ch as char);
572                        }
573                        b',' if self.depth == 0 => {
574                            // End of primitive value, don't include comma
575                            break;
576                        }
577                        _ if self.depth == 0 && ch.is_ascii_whitespace() => {
578                            // End of primitive value at whitespace
579                            break;
580                        }
581                        _ => {
582                            self.value_buffer.push(ch as char);
583                        }
584                    }
585
586                    prev_char = ch;
587                }
588                Err(e) => {
589                    return Some(Err(StreamError::Io(e)));
590                }
591            }
592        }
593
594        if self.value_buffer.is_empty() {
595            return None;
596        }
597
598        // Parse the extracted JSON value
599        match serde_json::from_str(&self.value_buffer) {
600            Ok(value) => Some(Ok(value)),
601            Err(e) => Some(Err(StreamError::Json(e))),
602        }
603    }
604}
605
606impl<R: Read> Iterator for JsonArrayStreamer<R> {
607    type Item = Result<Document, StreamError>;
608
609    fn next(&mut self) -> Option<Self::Item> {
610        // Get the next JSON value based on mode
611        let value = match &mut self.inner {
612            JsonArrayStreamerInner::Buffered { array, index, .. } => {
613                if *index >= array.len() {
614                    return None;
615                }
616                // O(1) access with std::mem::take to avoid O(n) Vec::remove(0)
617                let value = std::mem::take(&mut array[*index]);
618                *index += 1;
619                value
620            }
621            JsonArrayStreamerInner::Streaming(parser) => match parser.next_value() {
622                Some(Ok(value)) => value,
623                Some(Err(e)) => return Some(Err(e)),
624                None => return None,
625            },
626        };
627
628        // Check object size if limit configured using efficient estimation
629        if let Some(max_bytes) = self.config.max_object_bytes {
630            let estimated_size = estimate_json_size(&value);
631            if estimated_size > max_bytes {
632                return Some(Err(StreamError::ObjectTooLarge(estimated_size, max_bytes)));
633            }
634        }
635
636        // Convert to HEDL document using zero-copy optimization
637        match from_json_value_owned(value, &self.config.from_json) {
638            Ok(doc) => Some(Ok(doc)),
639            Err(e) => Some(Err(StreamError::Conversion(e))),
640        }
641    }
642}
643
644/// Estimate the serialized JSON size of a value without allocating.
645///
646/// This provides a conservative estimate (never under-estimates) to avoid
647/// the overhead of serializing just to check size. The estimate accounts for:
648/// - Literal sizes: null (4), true (4), false (5)
649/// - Number serialization length
650/// - String quotes and potential escaping (10% margin)
651/// - Array/object brackets and separators
652///
653/// # Performance
654///
655/// O(n) in the structure depth, but with much lower constant factor than
656/// serialization since no string allocation or copying occurs.
657fn estimate_json_size(value: &JsonValue) -> usize {
658    match value {
659        JsonValue::Null => 4,        // "null"
660        JsonValue::Bool(true) => 4,  // "true"
661        JsonValue::Bool(false) => 5, // "false"
662        JsonValue::Number(n) => {
663            // Conservative estimate for number serialization
664            n.to_string().len()
665        }
666        JsonValue::String(s) => {
667            // Account for quotes and common escapes
668            // Add 10% margin for potential escape sequences
669            let escape_margin = s.len() / 10;
670            s.len() + 2 + escape_margin
671        }
672        JsonValue::Array(arr) => {
673            if arr.is_empty() {
674                return 2; // "[]"
675            }
676            // "[" + elements + commas + "]"
677            2 + arr.iter().map(estimate_json_size).sum::<usize>() + (arr.len() - 1)
678        }
679        JsonValue::Object(obj) => {
680            if obj.is_empty() {
681                return 2; // "{}"
682            }
683            // "{" + "key": value pairs + commas + "}"
684            let pair_size: usize = obj
685                .iter()
686                .map(|(k, v)| {
687                    // "key": value = key.len() + 2 (quotes) + 1 (colon) + 1 (space) + value
688                    k.len() + 4 + estimate_json_size(v)
689                })
690                .sum();
691            2 + pair_size + (obj.len() - 1) // commas between pairs
692        }
693    }
694}
695
696/// Streaming parser for JSONL (JSON Lines) format
697///
698/// Parses newline-delimited JSON, yielding each line as a HEDL document.
699/// Memory-efficient for processing large log files and streaming data.
700///
701/// # Format
702///
703/// Each line is a complete JSON object:
704/// ```text
705/// {"id": "1", "name": "Alice"}
706/// {"id": "2", "name": "Bob"}
707/// {"id": "3", "name": "Charlie"}
708/// ```
709///
710/// # Features
711///
712/// - **Blank Lines**: Skipped automatically
713/// - **Comments**: Lines starting with `#` are skipped
714/// - **Robustness**: Invalid lines can be skipped or cause errors
715/// - **Memory Bounded**: Only one line in memory at a time
716///
717/// # Examples
718///
719/// ```text
720/// use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
721/// use std::fs::File;
722///
723/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
724/// let file = File::open("logs.jsonl")?;
725/// let config = StreamConfig::default();
726/// let streamer = JsonLinesStreamer::new(file, config);
727///
728/// for result in streamer {
729///     let doc = result?;
730///     // Process each log entry
731/// }
732/// # Ok(())
733/// # }
734/// ```
735pub struct JsonLinesStreamer<R: Read> {
736    reader: BufReader<R>,
737    config: StreamConfig,
738    line_buffer: String,
739    line_number: usize,
740}
741
742impl<R: Read> JsonLinesStreamer<R> {
743    /// Create a new JSONL streamer
744    ///
745    /// # Arguments
746    ///
747    /// * `reader` - Input source (file, network stream, etc.)
748    /// * `config` - Streaming configuration
749    ///
750    /// # Examples
751    ///
752    /// ```text
753    /// use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
754    /// use std::io::Cursor;
755    ///
756    /// let jsonl = "{\"id\": \"1\"}\n{\"id\": \"2\"}";
757    /// let reader = Cursor::new(jsonl.as_bytes());
758    /// let config = StreamConfig::default();
759    /// let streamer = JsonLinesStreamer::new(reader, config);
760    /// ```
761    pub fn new(reader: R, config: StreamConfig) -> Self {
762        let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
763        Self {
764            reader: buf_reader,
765            config,
766            line_buffer: String::new(),
767            line_number: 0,
768        }
769    }
770
771    /// Get the current line number (1-indexed)
772    pub fn line_number(&self) -> usize {
773        self.line_number
774    }
775}
776
777impl<R: Read> Iterator for JsonLinesStreamer<R> {
778    type Item = Result<Document, StreamError>;
779
780    fn next(&mut self) -> Option<Self::Item> {
781        loop {
782            self.line_buffer.clear();
783            self.line_number += 1;
784
785            // Read next line
786            match self.reader.read_line(&mut self.line_buffer) {
787                Ok(0) => return None, // EOF
788                Ok(_) => {
789                    // Trim whitespace
790                    let line = self.line_buffer.trim();
791
792                    // Skip blank lines and comments
793                    if line.is_empty() || line.starts_with('#') {
794                        continue;
795                    }
796
797                    // Check line size if limit configured
798                    if let Some(max_bytes) = self.config.max_object_bytes {
799                        if line.len() > max_bytes {
800                            return Some(Err(StreamError::ObjectTooLarge(line.len(), max_bytes)));
801                        }
802                    }
803
804                    // Parse JSON
805                    let value: JsonValue = match serde_json::from_str(line) {
806                        Ok(v) => v,
807                        Err(e) => return Some(Err(StreamError::Json(e))),
808                    };
809
810                    // Convert to HEDL document
811                    match from_json_value_owned(value, &self.config.from_json) {
812                        Ok(doc) => return Some(Ok(doc)),
813                        Err(e) => return Some(Err(StreamError::Conversion(e))),
814                    }
815                }
816                Err(e) => return Some(Err(StreamError::Io(e))),
817            }
818        }
819    }
820}
821
822/// Streaming writer for JSONL format
823///
824/// Writes HEDL documents as newline-delimited JSON for efficient streaming.
825///
826/// # Format
827///
828/// Each document is written as a single JSON object followed by newline:
829/// ```text
830/// {"id":"1","name":"Alice"}
831/// {"id":"2","name":"Bob"}
832/// ```
833///
834/// # Examples
835///
836/// ```text
837/// use hedl_json::streaming::JsonLinesWriter;
838/// use hedl_core::Document;
839/// use std::io::Cursor;
840///
841/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
842/// let mut buffer = Vec::new();
843/// let mut writer = JsonLinesWriter::new(&mut buffer);
844///
845/// let doc1 = Document::new((2, 0));
846/// writer.write_document(&doc1)?;
847///
848/// let doc2 = Document::new((2, 0));
849/// writer.write_document(&doc2)?;
850///
851/// writer.flush()?;
852/// # Ok(())
853/// # }
854/// ```
855pub struct JsonLinesWriter<W: std::io::Write> {
856    writer: W,
857}
858
859impl<W: std::io::Write> JsonLinesWriter<W> {
860    /// Create a new JSONL writer
861    ///
862    /// # Arguments
863    ///
864    /// * `writer` - Output destination
865    ///
866    /// # Examples
867    ///
868    /// ```text
869    /// use hedl_json::streaming::JsonLinesWriter;
870    /// use std::fs::File;
871    ///
872    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
873    /// let file = File::create("output.jsonl")?;
874    /// let mut writer = JsonLinesWriter::new(file);
875    /// # Ok(())
876    /// # }
877    /// ```
878    pub fn new(writer: W) -> Self {
879        Self { writer }
880    }
881
882    /// Write a HEDL document as a JSONL entry
883    ///
884    /// Converts the document to JSON and writes it followed by a newline.
885    ///
886    /// # Errors
887    ///
888    /// Returns error if JSON conversion or I/O write fails.
889    ///
890    /// # Examples
891    ///
892    /// ```text
893    /// use hedl_json::streaming::JsonLinesWriter;
894    /// use hedl_core::Document;
895    ///
896    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
897    /// let mut buffer = Vec::new();
898    /// let mut writer = JsonLinesWriter::new(&mut buffer);
899    ///
900    /// let doc = Document::new((2, 0));
901    /// writer.write_document(&doc)?;
902    /// # Ok(())
903    /// # }
904    /// ```
905    pub fn write_document(&mut self, doc: &Document) -> Result<(), StreamError> {
906        // Convert to JSON value
907        let value = crate::to_json_value(doc, &crate::ToJsonConfig::default())
908            .map_err(StreamError::InvalidJsonL)?;
909
910        // Write compact JSON (no pretty printing for JSONL)
911        serde_json::to_writer(&mut self.writer, &value)?;
912
913        // Write newline
914        self.writer.write_all(b"\n")?;
915
916        Ok(())
917    }
918
919    /// Flush the output buffer
920    ///
921    /// Ensures all data is written to the underlying writer.
922    ///
923    /// # Examples
924    ///
925    /// ```text
926    /// use hedl_json::streaming::JsonLinesWriter;
927    /// use hedl_core::Document;
928    ///
929    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
930    /// let mut buffer = Vec::new();
931    /// let mut writer = JsonLinesWriter::new(&mut buffer);
932    ///
933    /// let doc = Document::new((2, 0));
934    /// writer.write_document(&doc)?;
935    /// writer.flush()?;
936    /// # Ok(())
937    /// # }
938    /// ```
939    pub fn flush(&mut self) -> Result<(), StreamError> {
940        std::io::Write::flush(&mut self.writer)?;
941        Ok(())
942    }
943}
944
945#[cfg(test)]
946mod tests {
947    use super::*;
948    use hedl_core::{Item, Value};
949    use std::io::Cursor;
950
951    // ==================== StreamConfig tests ====================
952
953    #[test]
954    fn test_stream_config_default() {
955        let config = StreamConfig::default();
956        assert_eq!(config.buffer_size, 64 * 1024);
957        assert_eq!(config.max_object_bytes, Some(10 * 1024 * 1024));
958    }
959
960    #[test]
961    fn test_stream_config_builder() {
962        let config = StreamConfig::builder()
963            .buffer_size(128 * 1024)
964            .max_object_bytes(50 * 1024 * 1024)
965            .build();
966
967        assert_eq!(config.buffer_size, 128 * 1024);
968        assert_eq!(config.max_object_bytes, Some(50 * 1024 * 1024));
969    }
970
971    #[test]
972    fn test_stream_config_unlimited() {
973        let config = StreamConfig::builder().unlimited_object_size().build();
974
975        assert_eq!(config.max_object_bytes, None);
976    }
977
978    // ==================== JsonArrayStreamer tests ====================
979
980    #[test]
981    fn test_array_streamer_simple() {
982        let json = r#"[
983            {"name": "Alice", "age": 30},
984            {"name": "Bob", "age": 25}
985        ]"#;
986
987        let reader = Cursor::new(json.as_bytes());
988        let config = StreamConfig::default();
989        let streamer = JsonArrayStreamer::new(reader, config).unwrap();
990
991        let docs: Vec<_> = streamer.collect();
992        assert_eq!(docs.len(), 2);
993
994        // Verify first document
995        let doc1 = docs[0].as_ref().unwrap();
996        assert!(doc1.root.contains_key("name"));
997        assert!(doc1.root.contains_key("age"));
998    }
999
1000    #[test]
1001    fn test_array_streamer_empty() {
1002        let json = r"[]";
1003
1004        let reader = Cursor::new(json.as_bytes());
1005        let config = StreamConfig::default();
1006        let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1007
1008        let docs: Vec<_> = streamer.collect();
1009        assert_eq!(docs.len(), 0);
1010    }
1011
1012    #[test]
1013    fn test_array_streamer_single() {
1014        let json = r#"[{"id": "1"}]"#;
1015
1016        let reader = Cursor::new(json.as_bytes());
1017        let config = StreamConfig::default();
1018        let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1019
1020        let docs: Vec<_> = streamer.collect();
1021        assert_eq!(docs.len(), 1);
1022    }
1023
1024    #[test]
1025    fn test_array_streamer_large_count() {
1026        // Generate large array
1027        let mut json = String::from("[");
1028        for i in 0..1000 {
1029            if i > 0 {
1030                json.push(',');
1031            }
1032            json.push_str(&format!(r#"{{"id": "{i}"}}"#));
1033        }
1034        json.push(']');
1035
1036        let reader = Cursor::new(json.as_bytes());
1037        let config = StreamConfig::default();
1038        let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1039
1040        let docs: Vec<_> = streamer.collect();
1041        assert_eq!(docs.len(), 1000);
1042    }
1043
1044    #[test]
1045    fn test_array_streamer_size_limit() {
1046        let json = r#"[{"data": "x"}]"#;
1047
1048        let reader = Cursor::new(json.as_bytes());
1049        let config = StreamConfig::builder()
1050            .max_object_bytes(5) // Very small limit
1051            .build();
1052
1053        let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1054        let result: Vec<_> = streamer.collect();
1055
1056        // Should error due to size limit
1057        assert!(result[0].is_err());
1058    }
1059
1060    // ==================== JsonLinesStreamer tests ====================
1061
1062    #[test]
1063    fn test_jsonl_streamer_simple() {
1064        let jsonl = r#"{"name": "Alice"}
1065{"name": "Bob"}
1066{"name": "Charlie"}"#;
1067
1068        let reader = Cursor::new(jsonl.as_bytes());
1069        let config = StreamConfig::default();
1070        let streamer = JsonLinesStreamer::new(reader, config);
1071
1072        let docs: Vec<_> = streamer.collect();
1073        assert_eq!(docs.len(), 3);
1074
1075        // Verify first document
1076        let doc1 = docs[0].as_ref().unwrap();
1077        if let Some(Item::Scalar(Value::String(name))) = doc1.root.get("name") {
1078            assert_eq!(name.as_ref(), "Alice");
1079        } else {
1080            panic!("Expected name field");
1081        }
1082    }
1083
1084    #[test]
1085    fn test_jsonl_streamer_blank_lines() {
1086        let jsonl = r#"{"id": "1"}
1087
1088{"id": "2"}
1089
1090{"id": "3"}"#;
1091
1092        let reader = Cursor::new(jsonl.as_bytes());
1093        let config = StreamConfig::default();
1094        let streamer = JsonLinesStreamer::new(reader, config);
1095
1096        let docs: Vec<_> = streamer.collect();
1097        assert_eq!(docs.len(), 3);
1098    }
1099
1100    #[test]
1101    fn test_jsonl_streamer_comments() {
1102        let jsonl = r#"# This is a comment
1103{"id": "1"}
1104# Another comment
1105{"id": "2"}"#;
1106
1107        let reader = Cursor::new(jsonl.as_bytes());
1108        let config = StreamConfig::default();
1109        let streamer = JsonLinesStreamer::new(reader, config);
1110
1111        let docs: Vec<_> = streamer.collect();
1112        assert_eq!(docs.len(), 2);
1113    }
1114
1115    #[test]
1116    fn test_jsonl_streamer_empty() {
1117        let jsonl = "";
1118
1119        let reader = Cursor::new(jsonl.as_bytes());
1120        let config = StreamConfig::default();
1121        let streamer = JsonLinesStreamer::new(reader, config);
1122
1123        let docs: Vec<_> = streamer.collect();
1124        assert_eq!(docs.len(), 0);
1125    }
1126
1127    #[test]
1128    fn test_jsonl_streamer_invalid_json() {
1129        let jsonl = r#"{"valid": "json"}
1130{invalid json}
1131{"also": "valid"}"#;
1132
1133        let reader = Cursor::new(jsonl.as_bytes());
1134        let config = StreamConfig::default();
1135        let streamer = JsonLinesStreamer::new(reader, config);
1136
1137        let docs: Vec<_> = streamer.collect();
1138        assert_eq!(docs.len(), 3);
1139        assert!(docs[0].is_ok());
1140        assert!(docs[1].is_err()); // Invalid JSON line
1141        assert!(docs[2].is_ok());
1142    }
1143
1144    #[test]
1145    fn test_jsonl_streamer_line_number() {
1146        let jsonl = r#"{"id": "1"}
1147{"id": "2"}"#;
1148
1149        let reader = Cursor::new(jsonl.as_bytes());
1150        let config = StreamConfig::default();
1151        let mut streamer = JsonLinesStreamer::new(reader, config);
1152
1153        assert_eq!(streamer.line_number(), 0);
1154        let _ = streamer.next();
1155        assert_eq!(streamer.line_number(), 1);
1156        let _ = streamer.next();
1157        assert_eq!(streamer.line_number(), 2);
1158    }
1159
1160    #[test]
1161    fn test_jsonl_streamer_size_limit() {
1162        let jsonl = r#"{"data": "x"}"#;
1163
1164        let reader = Cursor::new(jsonl.as_bytes());
1165        let config = StreamConfig::builder()
1166            .max_object_bytes(5) // Very small limit
1167            .build();
1168
1169        let streamer = JsonLinesStreamer::new(reader, config);
1170        let result: Vec<_> = streamer.collect();
1171
1172        // Should error due to size limit
1173        assert!(result[0].is_err());
1174    }
1175
1176    // ==================== JsonLinesWriter tests ====================
1177
1178    #[test]
1179    fn test_jsonl_writer_simple() {
1180        let mut buffer = Vec::new();
1181        let mut writer = JsonLinesWriter::new(&mut buffer);
1182
1183        let mut doc1 = Document::new((2, 0));
1184        doc1.root.insert(
1185            "id".to_string(),
1186            Item::Scalar(Value::String("1".to_string().into())),
1187        );
1188        writer.write_document(&doc1).unwrap();
1189
1190        let mut doc2 = Document::new((2, 0));
1191        doc2.root.insert(
1192            "id".to_string(),
1193            Item::Scalar(Value::String("2".to_string().into())),
1194        );
1195        writer.write_document(&doc2).unwrap();
1196
1197        writer.flush().unwrap();
1198
1199        let output = String::from_utf8(buffer).unwrap();
1200        let lines: Vec<_> = output.lines().collect();
1201        assert_eq!(lines.len(), 2);
1202        assert!(lines[0].contains("\"id\""));
1203        assert!(lines[1].contains("\"id\""));
1204    }
1205
1206    #[test]
1207    fn test_jsonl_writer_empty_document() {
1208        let mut buffer = Vec::new();
1209        let mut writer = JsonLinesWriter::new(&mut buffer);
1210
1211        let doc = Document::new((2, 0));
1212        writer.write_document(&doc).unwrap();
1213        writer.flush().unwrap();
1214
1215        let output = String::from_utf8(buffer).unwrap();
1216        assert_eq!(output.trim(), "{}");
1217    }
1218
1219    #[test]
1220    fn test_jsonl_roundtrip() {
1221        // Write documents
1222        let mut buffer = Vec::new();
1223        let mut writer = JsonLinesWriter::new(&mut buffer);
1224
1225        for i in 1..=3 {
1226            let mut doc = Document::new((2, 0));
1227            doc.root.insert(
1228                "id".to_string(),
1229                Item::Scalar(Value::String(i.to_string().into())),
1230            );
1231            doc.root
1232                .insert("value".to_string(), Item::Scalar(Value::Int(i * 10)));
1233            writer.write_document(&doc).unwrap();
1234        }
1235        writer.flush().unwrap();
1236
1237        // Read documents back
1238        let reader = Cursor::new(buffer);
1239        let config = StreamConfig::default();
1240        let streamer = JsonLinesStreamer::new(reader, config);
1241
1242        let docs: Vec<_> = streamer.collect();
1243        assert_eq!(docs.len(), 3);
1244
1245        // Verify first document
1246        let doc1 = docs[0].as_ref().unwrap();
1247        assert_eq!(
1248            doc1.root.get("id").unwrap().as_scalar().unwrap(),
1249            &Value::String("1".to_string().into())
1250        );
1251        assert_eq!(
1252            doc1.root.get("value").unwrap().as_scalar().unwrap(),
1253            &Value::Int(10)
1254        );
1255    }
1256}