hedl_json/streaming.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming JSON parsing for HEDL
19//!
20//! This module provides memory-efficient streaming parsers for processing
21//! large JSON files without loading the entire document into memory.
22//!
23//! # Features
24//!
25//! - **Incremental Parsing**: Process JSON objects as they arrive
26//! - **JSONL Support**: Parse newline-delimited JSON (JSON Lines)
27//! - **Memory Bounded**: Configurable memory limits for safe streaming
28//! - **Iterator-Based**: Ergonomic Rust iterator interface
29//!
30//! # Examples
31//!
32//! ## Streaming JSON Array
33//!
34//! ```rust
35//! use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
36//! use std::io::Cursor;
37//!
38//! let json = r#"[
39//! {"id": "1", "name": "Alice"},
40//! {"id": "2", "name": "Bob"}
41//! ]"#;
42//!
43//! let reader = Cursor::new(json.as_bytes());
44//! let config = StreamConfig::default();
45//! let streamer = JsonArrayStreamer::new(reader, config).unwrap();
46//!
47//! for result in streamer {
48//! let doc = result.unwrap();
49//! println!("Parsed document: {:?}", doc);
50//! }
51//! ```
52//!
53//! ## JSONL Streaming
54//!
55//! ```rust
56//! use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
57//! use std::io::Cursor;
58//!
59//! let jsonl = r#"{"id": "1", "name": "Alice"}
60//! {"id": "2", "name": "Bob"}
61//! {"id": "3", "name": "Charlie"}"#;
62//!
63//! let reader = Cursor::new(jsonl.as_bytes());
64//! let config = StreamConfig::default();
65//! let streamer = JsonLinesStreamer::new(reader, config);
66//!
67//! for result in streamer {
68//! let doc = result.unwrap();
69//! println!("Parsed document: {:?}", doc);
70//! }
71//! ```
72
73use crate::from_json::{from_json_value_owned, FromJsonConfig, JsonConversionError};
74use hedl_core::Document;
75use serde_json::Value as JsonValue;
76use std::io::{BufRead, BufReader, Read};
77use std::marker::PhantomData;
78
79// Import the Error trait for custom error creation
80use serde::de::Error as _;
81
82/// Configuration for streaming JSON parsing
83///
84/// Controls memory limits and parsing behavior for streaming operations.
85///
86/// # Memory Safety
87///
88/// Streaming parsers process data incrementally to avoid loading entire
89/// files into memory. However, individual objects can still be large.
90/// Configure `max_object_bytes` to limit memory per object.
91///
92/// # Examples
93///
94/// ```text
95/// use hedl_json::streaming::StreamConfig;
96/// use hedl_json::FromJsonConfig;
97///
98/// // Default configuration - suitable for trusted input
99/// let config = StreamConfig::default();
100///
101/// // Conservative configuration for untrusted input
102/// let strict = StreamConfig {
103/// buffer_size: 8 * 1024, // 8 KB buffer
104/// max_object_bytes: 1024 * 1024, // 1 MB per object
105/// from_json: FromJsonConfig::builder()
106/// .max_depth(100)
107/// .max_array_size(10_000)
108/// .build(),
109/// };
110///
111/// // High-throughput configuration for large ML datasets
112/// let ml_config = StreamConfig {
113/// buffer_size: 256 * 1024, // 256 KB buffer
114/// max_object_bytes: 100 * 1024 * 1024, // 100 MB per object
115/// from_json: FromJsonConfig::default(),
116/// };
117/// ```
118#[derive(Debug, Clone)]
119pub struct StreamConfig {
120 /// Size of internal read buffer in bytes (default: 64 KB)
121 ///
122 /// Larger buffers improve throughput for network I/O but use more memory.
123 /// Smaller buffers reduce memory overhead for many concurrent streams.
124 pub buffer_size: usize,
125
126 /// Maximum bytes per JSON object (default: 10 MB)
127 ///
128 /// Prevents memory exhaustion from individual oversized objects.
129 /// Set to `None` to disable (not recommended for untrusted input).
130 pub max_object_bytes: Option<usize>,
131
132 /// Configuration for JSON to HEDL conversion
133 ///
134 /// Controls limits and behavior when converting each parsed JSON
135 /// object to a HEDL document.
136 pub from_json: FromJsonConfig,
137
138 /// Enable efficient size estimation instead of serialization for size checks.
139 /// Default: true
140 pub use_size_estimation: bool,
141
142 /// Enable true streaming for JSON arrays (constant memory usage).
143 /// When true, uses incremental parsing instead of loading entire array.
144 /// Default: true
145 pub true_streaming: bool,
146}
147
148impl Default for StreamConfig {
149 fn default() -> Self {
150 Self {
151 buffer_size: 64 * 1024, // 64 KB - good balance for most use cases
152 max_object_bytes: Some(10 * 1024 * 1024), // 10 MB per object
153 from_json: FromJsonConfig::default(),
154 use_size_estimation: true,
155 true_streaming: true,
156 }
157 }
158}
159
160impl StreamConfig {
161 /// Configuration optimized for large files (GB+)
162 ///
163 /// Uses larger buffers and object limits while maintaining constant memory.
164 #[must_use]
165 pub fn large_file() -> Self {
166 Self {
167 buffer_size: 256 * 1024, // 256 KB buffer
168 max_object_bytes: Some(50 * 1024 * 1024), // 50 MB per object
169 from_json: FromJsonConfig::default(),
170 use_size_estimation: true,
171 true_streaming: true,
172 }
173 }
174
175 /// Configuration for memory-constrained environments
176 ///
177 /// Minimizes memory usage at the cost of some throughput.
178 #[must_use]
179 pub fn low_memory() -> Self {
180 Self {
181 buffer_size: 8 * 1024, // 8 KB buffer
182 max_object_bytes: Some(1024 * 1024), // 1 MB per object
183 from_json: FromJsonConfig::default(),
184 use_size_estimation: true,
185 true_streaming: true,
186 }
187 }
188}
189
190impl StreamConfig {
191 /// Create a new builder for configuring stream parsing
192 ///
193 /// # Examples
194 ///
195 /// ```text
196 /// use hedl_json::streaming::StreamConfig;
197 ///
198 /// let config = StreamConfig::builder()
199 /// .buffer_size(128 * 1024)
200 /// .max_object_bytes(50 * 1024 * 1024)
201 /// .build();
202 /// ```
203 #[must_use]
204 pub fn builder() -> StreamConfigBuilder {
205 StreamConfigBuilder::default()
206 }
207}
208
209/// Builder for `StreamConfig`
210///
211/// Provides ergonomic configuration of streaming behavior.
212#[derive(Debug, Clone)]
213pub struct StreamConfigBuilder {
214 buffer_size: usize,
215 max_object_bytes: Option<usize>,
216 from_json: FromJsonConfig,
217 use_size_estimation: bool,
218 true_streaming: bool,
219}
220
221impl Default for StreamConfigBuilder {
222 fn default() -> Self {
223 Self {
224 buffer_size: 64 * 1024,
225 max_object_bytes: Some(10 * 1024 * 1024),
226 from_json: FromJsonConfig::default(),
227 use_size_estimation: true,
228 true_streaming: true,
229 }
230 }
231}
232
233impl StreamConfigBuilder {
234 /// Set the buffer size in bytes
235 #[must_use]
236 pub fn buffer_size(mut self, size: usize) -> Self {
237 self.buffer_size = size;
238 self
239 }
240
241 /// Set the maximum object size in bytes
242 #[must_use]
243 pub fn max_object_bytes(mut self, limit: usize) -> Self {
244 self.max_object_bytes = Some(limit);
245 self
246 }
247
248 /// Disable object size limit (use with caution)
249 #[must_use]
250 pub fn unlimited_object_size(mut self) -> Self {
251 self.max_object_bytes = None;
252 self
253 }
254
255 /// Set the JSON conversion configuration
256 #[must_use]
257 pub fn from_json_config(mut self, config: FromJsonConfig) -> Self {
258 self.from_json = config;
259 self
260 }
261
262 /// Enable or disable size estimation optimization
263 #[must_use]
264 pub fn use_size_estimation(mut self, enabled: bool) -> Self {
265 self.use_size_estimation = enabled;
266 self
267 }
268
269 /// Enable or disable true streaming (constant memory)
270 #[must_use]
271 pub fn true_streaming(mut self, enabled: bool) -> Self {
272 self.true_streaming = enabled;
273 self
274 }
275
276 /// Build the configuration
277 #[must_use]
278 pub fn build(self) -> StreamConfig {
279 StreamConfig {
280 buffer_size: self.buffer_size,
281 max_object_bytes: self.max_object_bytes,
282 from_json: self.from_json,
283 use_size_estimation: self.use_size_estimation,
284 true_streaming: self.true_streaming,
285 }
286 }
287}
288
289/// Errors that can occur during streaming JSON parsing
290#[derive(Debug, thiserror::Error)]
291pub enum StreamError {
292 /// I/O error while reading input
293 #[error("I/O error: {0}")]
294 Io(#[from] std::io::Error),
295
296 /// JSON parsing error
297 #[error("JSON parse error: {0}")]
298 Json(#[from] serde_json::Error),
299
300 /// JSON to HEDL conversion error
301 #[error("HEDL conversion error: {0}")]
302 Conversion(#[from] JsonConversionError),
303
304 /// Object exceeded size limit
305 #[error("Object size ({0} bytes) exceeds limit ({1} bytes)")]
306 ObjectTooLarge(usize, usize),
307
308 /// Invalid JSONL format
309 #[error("Invalid JSONL: {0}")]
310 InvalidJsonL(String),
311}
312
313/// Streaming parser for JSON arrays
314///
315/// Parses a JSON array incrementally, yielding each element as a HEDL document.
316/// Memory-efficient for processing large arrays without loading entire array.
317///
318/// # Format
319///
320/// Expects a JSON array of objects:
321/// ```json
322/// [
323/// {"id": "1", "name": "Alice"},
324/// {"id": "2", "name": "Bob"}
325/// ]
326/// ```
327///
328/// # Memory Usage
329///
330/// With `true_streaming` enabled (default):
331/// - **Constant**: O(1) memory regardless of array size
332/// - **Buffer**: Configured buffer size (default 64 KB)
333/// - **Per-object**: Limited by `max_object_bytes` (default 10 MB)
334/// - Processes GB+ files with ~15 MB constant memory
335///
336/// # Examples
337///
338/// ```text
339/// use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
340/// use std::fs::File;
341///
342/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
343/// let file = File::open("large_dataset.json")?;
344/// let config = StreamConfig::default();
345/// let streamer = JsonArrayStreamer::new(file, config)?;
346///
347/// let mut count = 0;
348/// for result in streamer {
349/// let doc = result?;
350/// count += 1;
351/// // Process document without loading entire array
352/// }
353/// println!("Processed {} documents", count);
354/// # Ok(())
355/// # }
356/// ```
357pub struct JsonArrayStreamer<R: Read> {
358 /// Internal implementation: either buffered (legacy) or true streaming
359 inner: JsonArrayStreamerInner<R>,
360 config: StreamConfig,
361}
362
363/// Internal implementation of array streaming
364enum JsonArrayStreamerInner<R: Read> {
365 /// Legacy buffered mode (loads entire array)
366 Buffered {
367 array: Vec<JsonValue>,
368 index: usize,
369 _phantom: PhantomData<R>,
370 },
371 /// True streaming mode (constant memory)
372 Streaming(TrueStreamingArrayParser<R>),
373}
374
375impl<R: Read> JsonArrayStreamer<R> {
376 /// Create a new array streamer
377 ///
378 /// # Arguments
379 ///
380 /// * `reader` - Input source (file, network stream, etc.)
381 /// * `config` - Streaming configuration
382 ///
383 /// # Errors
384 ///
385 /// Returns error if the input doesn't start with a JSON array.
386 ///
387 /// # Streaming Modes
388 ///
389 /// When `config.true_streaming` is enabled (default), uses constant memory
390 /// regardless of array size. Disable for legacy buffered mode.
391 ///
392 /// # Examples
393 ///
394 /// ```text
395 /// use hedl_json::streaming::{JsonArrayStreamer, StreamConfig};
396 /// use std::io::Cursor;
397 ///
398 /// let json = r#"[{"id": "1"}]"#;
399 /// let reader = Cursor::new(json.as_bytes());
400 /// let config = StreamConfig::default();
401 /// let streamer = JsonArrayStreamer::new(reader, config).unwrap();
402 /// ```
403 pub fn new(reader: R, config: StreamConfig) -> Result<Self, StreamError> {
404 if config.true_streaming {
405 // Use true streaming mode (constant memory)
406 let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
407 let parser = TrueStreamingArrayParser::new(buf_reader)?;
408 Ok(Self {
409 inner: JsonArrayStreamerInner::Streaming(parser),
410 config,
411 })
412 } else {
413 // Legacy buffered mode (loads entire array into memory)
414 let mut reader = reader;
415 let mut json_str = String::new();
416 reader.read_to_string(&mut json_str)?;
417
418 let value: JsonValue = serde_json::from_str(&json_str)?;
419 let array = match value {
420 JsonValue::Array(arr) => arr,
421 _ => {
422 return Err(StreamError::Json(serde_json::Error::custom(
423 "Expected JSON array",
424 )));
425 }
426 };
427
428 Ok(Self {
429 inner: JsonArrayStreamerInner::Buffered {
430 array,
431 index: 0,
432 _phantom: PhantomData,
433 },
434 config,
435 })
436 }
437 }
438}
439
440/// True streaming JSON array parser with O(1) memory usage.
441///
442/// Uses byte-level parsing to extract array elements one at a time
443/// without loading the entire array into memory.
444struct TrueStreamingArrayParser<R: Read> {
445 /// Buffered reader for efficient I/O
446 reader: BufReader<R>,
447 /// Buffer for reading individual JSON values
448 value_buffer: String,
449 /// Current nesting depth (for tracking object/array boundaries)
450 depth: i32,
451 /// Whether we're inside a string literal
452 in_string: bool,
453 /// Whether we've reached the end of the array
454 finished: bool,
455}
456
457impl<R: Read> TrueStreamingArrayParser<R> {
458 /// Create a new true streaming parser
459 fn new(mut reader: BufReader<R>) -> Result<Self, StreamError> {
460 // Skip whitespace and find opening bracket
461 let mut buf = [0u8; 1];
462 loop {
463 if reader.read(&mut buf)? == 0 {
464 return Err(StreamError::Json(serde_json::Error::custom(
465 "Unexpected end of input, expected JSON array",
466 )));
467 } else {
468 let ch = buf[0];
469 if ch.is_ascii_whitespace() {
470 continue;
471 }
472 if ch == b'[' {
473 break;
474 }
475 return Err(StreamError::Json(serde_json::Error::custom(format!(
476 "Expected '[' at start of JSON array, found '{}'",
477 ch as char
478 ))));
479 }
480 }
481
482 Ok(Self {
483 reader,
484 value_buffer: String::with_capacity(4096),
485 depth: 0,
486 in_string: false,
487 finished: false,
488 })
489 }
490
491 /// Read the next JSON value from the array
492 fn next_value(&mut self) -> Option<Result<JsonValue, StreamError>> {
493 if self.finished {
494 return None;
495 }
496
497 self.value_buffer.clear();
498 self.depth = 0;
499 self.in_string = false;
500
501 let mut buf = [0u8; 1];
502 let mut prev_char: u8 = 0;
503 let mut value_started = false;
504
505 loop {
506 match self.reader.read(&mut buf) {
507 Ok(0) => {
508 // EOF
509 if value_started && self.depth == 0 {
510 // We have a complete value
511 break;
512 }
513 self.finished = true;
514 if value_started {
515 return Some(Err(StreamError::Json(serde_json::Error::custom(
516 "Unexpected end of input while parsing array element",
517 ))));
518 }
519 return None;
520 }
521 Ok(_) => {
522 let ch = buf[0];
523
524 // Handle string state
525 if self.in_string {
526 self.value_buffer.push(ch as char);
527 if ch == b'"' && prev_char != b'\\' {
528 self.in_string = false;
529 }
530 prev_char = ch;
531 continue;
532 }
533
534 // Skip leading whitespace before value
535 if !value_started && ch.is_ascii_whitespace() {
536 continue;
537 }
538
539 // Check for end of array before value
540 if !value_started && ch == b']' {
541 self.finished = true;
542 return None;
543 }
544
545 // Skip comma between elements
546 if !value_started && ch == b',' {
547 continue;
548 }
549
550 // Start of value
551 if !value_started {
552 value_started = true;
553 }
554
555 // Track depth for objects and arrays
556 match ch {
557 b'{' | b'[' => {
558 self.depth += 1;
559 self.value_buffer.push(ch as char);
560 }
561 b'}' | b']' => {
562 self.depth -= 1;
563 self.value_buffer.push(ch as char);
564 if self.depth == 0 {
565 // Complete object/array value
566 break;
567 }
568 }
569 b'"' => {
570 self.in_string = true;
571 self.value_buffer.push(ch as char);
572 }
573 b',' if self.depth == 0 => {
574 // End of primitive value, don't include comma
575 break;
576 }
577 _ if self.depth == 0 && ch.is_ascii_whitespace() => {
578 // End of primitive value at whitespace
579 break;
580 }
581 _ => {
582 self.value_buffer.push(ch as char);
583 }
584 }
585
586 prev_char = ch;
587 }
588 Err(e) => {
589 return Some(Err(StreamError::Io(e)));
590 }
591 }
592 }
593
594 if self.value_buffer.is_empty() {
595 return None;
596 }
597
598 // Parse the extracted JSON value
599 match serde_json::from_str(&self.value_buffer) {
600 Ok(value) => Some(Ok(value)),
601 Err(e) => Some(Err(StreamError::Json(e))),
602 }
603 }
604}
605
606impl<R: Read> Iterator for JsonArrayStreamer<R> {
607 type Item = Result<Document, StreamError>;
608
609 fn next(&mut self) -> Option<Self::Item> {
610 // Get the next JSON value based on mode
611 let value = match &mut self.inner {
612 JsonArrayStreamerInner::Buffered { array, index, .. } => {
613 if *index >= array.len() {
614 return None;
615 }
616 // O(1) access with std::mem::take to avoid O(n) Vec::remove(0)
617 let value = std::mem::take(&mut array[*index]);
618 *index += 1;
619 value
620 }
621 JsonArrayStreamerInner::Streaming(parser) => match parser.next_value() {
622 Some(Ok(value)) => value,
623 Some(Err(e)) => return Some(Err(e)),
624 None => return None,
625 },
626 };
627
628 // Check object size if limit configured using efficient estimation
629 if let Some(max_bytes) = self.config.max_object_bytes {
630 let estimated_size = estimate_json_size(&value);
631 if estimated_size > max_bytes {
632 return Some(Err(StreamError::ObjectTooLarge(estimated_size, max_bytes)));
633 }
634 }
635
636 // Convert to HEDL document using zero-copy optimization
637 match from_json_value_owned(value, &self.config.from_json) {
638 Ok(doc) => Some(Ok(doc)),
639 Err(e) => Some(Err(StreamError::Conversion(e))),
640 }
641 }
642}
643
644/// Estimate the serialized JSON size of a value without allocating.
645///
646/// This provides a conservative estimate (never under-estimates) to avoid
647/// the overhead of serializing just to check size. The estimate accounts for:
648/// - Literal sizes: null (4), true (4), false (5)
649/// - Number serialization length
650/// - String quotes and potential escaping (10% margin)
651/// - Array/object brackets and separators
652///
653/// # Performance
654///
655/// O(n) in the structure depth, but with much lower constant factor than
656/// serialization since no string allocation or copying occurs.
657fn estimate_json_size(value: &JsonValue) -> usize {
658 match value {
659 JsonValue::Null => 4, // "null"
660 JsonValue::Bool(true) => 4, // "true"
661 JsonValue::Bool(false) => 5, // "false"
662 JsonValue::Number(n) => {
663 // Conservative estimate for number serialization
664 n.to_string().len()
665 }
666 JsonValue::String(s) => {
667 // Account for quotes and common escapes
668 // Add 10% margin for potential escape sequences
669 let escape_margin = s.len() / 10;
670 s.len() + 2 + escape_margin
671 }
672 JsonValue::Array(arr) => {
673 if arr.is_empty() {
674 return 2; // "[]"
675 }
676 // "[" + elements + commas + "]"
677 2 + arr.iter().map(estimate_json_size).sum::<usize>() + (arr.len() - 1)
678 }
679 JsonValue::Object(obj) => {
680 if obj.is_empty() {
681 return 2; // "{}"
682 }
683 // "{" + "key": value pairs + commas + "}"
684 let pair_size: usize = obj
685 .iter()
686 .map(|(k, v)| {
687 // "key": value = key.len() + 2 (quotes) + 1 (colon) + 1 (space) + value
688 k.len() + 4 + estimate_json_size(v)
689 })
690 .sum();
691 2 + pair_size + (obj.len() - 1) // commas between pairs
692 }
693 }
694}
695
696/// Streaming parser for JSONL (JSON Lines) format
697///
698/// Parses newline-delimited JSON, yielding each line as a HEDL document.
699/// Memory-efficient for processing large log files and streaming data.
700///
701/// # Format
702///
703/// Each line is a complete JSON object:
704/// ```text
705/// {"id": "1", "name": "Alice"}
706/// {"id": "2", "name": "Bob"}
707/// {"id": "3", "name": "Charlie"}
708/// ```
709///
710/// # Features
711///
712/// - **Blank Lines**: Skipped automatically
713/// - **Comments**: Lines starting with `#` are skipped
714/// - **Robustness**: Invalid lines can be skipped or cause errors
715/// - **Memory Bounded**: Only one line in memory at a time
716///
717/// # Examples
718///
719/// ```text
720/// use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
721/// use std::fs::File;
722///
723/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
724/// let file = File::open("logs.jsonl")?;
725/// let config = StreamConfig::default();
726/// let streamer = JsonLinesStreamer::new(file, config);
727///
728/// for result in streamer {
729/// let doc = result?;
730/// // Process each log entry
731/// }
732/// # Ok(())
733/// # }
734/// ```
735pub struct JsonLinesStreamer<R: Read> {
736 reader: BufReader<R>,
737 config: StreamConfig,
738 line_buffer: String,
739 line_number: usize,
740}
741
742impl<R: Read> JsonLinesStreamer<R> {
743 /// Create a new JSONL streamer
744 ///
745 /// # Arguments
746 ///
747 /// * `reader` - Input source (file, network stream, etc.)
748 /// * `config` - Streaming configuration
749 ///
750 /// # Examples
751 ///
752 /// ```text
753 /// use hedl_json::streaming::{JsonLinesStreamer, StreamConfig};
754 /// use std::io::Cursor;
755 ///
756 /// let jsonl = "{\"id\": \"1\"}\n{\"id\": \"2\"}";
757 /// let reader = Cursor::new(jsonl.as_bytes());
758 /// let config = StreamConfig::default();
759 /// let streamer = JsonLinesStreamer::new(reader, config);
760 /// ```
761 pub fn new(reader: R, config: StreamConfig) -> Self {
762 let buf_reader = BufReader::with_capacity(config.buffer_size, reader);
763 Self {
764 reader: buf_reader,
765 config,
766 line_buffer: String::new(),
767 line_number: 0,
768 }
769 }
770
771 /// Get the current line number (1-indexed)
772 pub fn line_number(&self) -> usize {
773 self.line_number
774 }
775}
776
777impl<R: Read> Iterator for JsonLinesStreamer<R> {
778 type Item = Result<Document, StreamError>;
779
780 fn next(&mut self) -> Option<Self::Item> {
781 loop {
782 self.line_buffer.clear();
783 self.line_number += 1;
784
785 // Read next line
786 match self.reader.read_line(&mut self.line_buffer) {
787 Ok(0) => return None, // EOF
788 Ok(_) => {
789 // Trim whitespace
790 let line = self.line_buffer.trim();
791
792 // Skip blank lines and comments
793 if line.is_empty() || line.starts_with('#') {
794 continue;
795 }
796
797 // Check line size if limit configured
798 if let Some(max_bytes) = self.config.max_object_bytes {
799 if line.len() > max_bytes {
800 return Some(Err(StreamError::ObjectTooLarge(line.len(), max_bytes)));
801 }
802 }
803
804 // Parse JSON
805 let value: JsonValue = match serde_json::from_str(line) {
806 Ok(v) => v,
807 Err(e) => return Some(Err(StreamError::Json(e))),
808 };
809
810 // Convert to HEDL document
811 match from_json_value_owned(value, &self.config.from_json) {
812 Ok(doc) => return Some(Ok(doc)),
813 Err(e) => return Some(Err(StreamError::Conversion(e))),
814 }
815 }
816 Err(e) => return Some(Err(StreamError::Io(e))),
817 }
818 }
819 }
820}
821
822/// Streaming writer for JSONL format
823///
824/// Writes HEDL documents as newline-delimited JSON for efficient streaming.
825///
826/// # Format
827///
828/// Each document is written as a single JSON object followed by newline:
829/// ```text
830/// {"id":"1","name":"Alice"}
831/// {"id":"2","name":"Bob"}
832/// ```
833///
834/// # Examples
835///
836/// ```text
837/// use hedl_json::streaming::JsonLinesWriter;
838/// use hedl_core::Document;
839/// use std::io::Cursor;
840///
841/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
842/// let mut buffer = Vec::new();
843/// let mut writer = JsonLinesWriter::new(&mut buffer);
844///
845/// let doc1 = Document::new((2, 0));
846/// writer.write_document(&doc1)?;
847///
848/// let doc2 = Document::new((2, 0));
849/// writer.write_document(&doc2)?;
850///
851/// writer.flush()?;
852/// # Ok(())
853/// # }
854/// ```
855pub struct JsonLinesWriter<W: std::io::Write> {
856 writer: W,
857}
858
859impl<W: std::io::Write> JsonLinesWriter<W> {
860 /// Create a new JSONL writer
861 ///
862 /// # Arguments
863 ///
864 /// * `writer` - Output destination
865 ///
866 /// # Examples
867 ///
868 /// ```text
869 /// use hedl_json::streaming::JsonLinesWriter;
870 /// use std::fs::File;
871 ///
872 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
873 /// let file = File::create("output.jsonl")?;
874 /// let mut writer = JsonLinesWriter::new(file);
875 /// # Ok(())
876 /// # }
877 /// ```
878 pub fn new(writer: W) -> Self {
879 Self { writer }
880 }
881
882 /// Write a HEDL document as a JSONL entry
883 ///
884 /// Converts the document to JSON and writes it followed by a newline.
885 ///
886 /// # Errors
887 ///
888 /// Returns error if JSON conversion or I/O write fails.
889 ///
890 /// # Examples
891 ///
892 /// ```text
893 /// use hedl_json::streaming::JsonLinesWriter;
894 /// use hedl_core::Document;
895 ///
896 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
897 /// let mut buffer = Vec::new();
898 /// let mut writer = JsonLinesWriter::new(&mut buffer);
899 ///
900 /// let doc = Document::new((2, 0));
901 /// writer.write_document(&doc)?;
902 /// # Ok(())
903 /// # }
904 /// ```
905 pub fn write_document(&mut self, doc: &Document) -> Result<(), StreamError> {
906 // Convert to JSON value
907 let value = crate::to_json_value(doc, &crate::ToJsonConfig::default())
908 .map_err(StreamError::InvalidJsonL)?;
909
910 // Write compact JSON (no pretty printing for JSONL)
911 serde_json::to_writer(&mut self.writer, &value)?;
912
913 // Write newline
914 self.writer.write_all(b"\n")?;
915
916 Ok(())
917 }
918
919 /// Flush the output buffer
920 ///
921 /// Ensures all data is written to the underlying writer.
922 ///
923 /// # Examples
924 ///
925 /// ```text
926 /// use hedl_json::streaming::JsonLinesWriter;
927 /// use hedl_core::Document;
928 ///
929 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
930 /// let mut buffer = Vec::new();
931 /// let mut writer = JsonLinesWriter::new(&mut buffer);
932 ///
933 /// let doc = Document::new((2, 0));
934 /// writer.write_document(&doc)?;
935 /// writer.flush()?;
936 /// # Ok(())
937 /// # }
938 /// ```
939 pub fn flush(&mut self) -> Result<(), StreamError> {
940 std::io::Write::flush(&mut self.writer)?;
941 Ok(())
942 }
943}
944
945#[cfg(test)]
946mod tests {
947 use super::*;
948 use hedl_core::{Item, Value};
949 use std::io::Cursor;
950
951 // ==================== StreamConfig tests ====================
952
953 #[test]
954 fn test_stream_config_default() {
955 let config = StreamConfig::default();
956 assert_eq!(config.buffer_size, 64 * 1024);
957 assert_eq!(config.max_object_bytes, Some(10 * 1024 * 1024));
958 }
959
960 #[test]
961 fn test_stream_config_builder() {
962 let config = StreamConfig::builder()
963 .buffer_size(128 * 1024)
964 .max_object_bytes(50 * 1024 * 1024)
965 .build();
966
967 assert_eq!(config.buffer_size, 128 * 1024);
968 assert_eq!(config.max_object_bytes, Some(50 * 1024 * 1024));
969 }
970
971 #[test]
972 fn test_stream_config_unlimited() {
973 let config = StreamConfig::builder().unlimited_object_size().build();
974
975 assert_eq!(config.max_object_bytes, None);
976 }
977
978 // ==================== JsonArrayStreamer tests ====================
979
980 #[test]
981 fn test_array_streamer_simple() {
982 let json = r#"[
983 {"name": "Alice", "age": 30},
984 {"name": "Bob", "age": 25}
985 ]"#;
986
987 let reader = Cursor::new(json.as_bytes());
988 let config = StreamConfig::default();
989 let streamer = JsonArrayStreamer::new(reader, config).unwrap();
990
991 let docs: Vec<_> = streamer.collect();
992 assert_eq!(docs.len(), 2);
993
994 // Verify first document
995 let doc1 = docs[0].as_ref().unwrap();
996 assert!(doc1.root.contains_key("name"));
997 assert!(doc1.root.contains_key("age"));
998 }
999
1000 #[test]
1001 fn test_array_streamer_empty() {
1002 let json = r"[]";
1003
1004 let reader = Cursor::new(json.as_bytes());
1005 let config = StreamConfig::default();
1006 let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1007
1008 let docs: Vec<_> = streamer.collect();
1009 assert_eq!(docs.len(), 0);
1010 }
1011
1012 #[test]
1013 fn test_array_streamer_single() {
1014 let json = r#"[{"id": "1"}]"#;
1015
1016 let reader = Cursor::new(json.as_bytes());
1017 let config = StreamConfig::default();
1018 let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1019
1020 let docs: Vec<_> = streamer.collect();
1021 assert_eq!(docs.len(), 1);
1022 }
1023
1024 #[test]
1025 fn test_array_streamer_large_count() {
1026 // Generate large array
1027 let mut json = String::from("[");
1028 for i in 0..1000 {
1029 if i > 0 {
1030 json.push(',');
1031 }
1032 json.push_str(&format!(r#"{{"id": "{i}"}}"#));
1033 }
1034 json.push(']');
1035
1036 let reader = Cursor::new(json.as_bytes());
1037 let config = StreamConfig::default();
1038 let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1039
1040 let docs: Vec<_> = streamer.collect();
1041 assert_eq!(docs.len(), 1000);
1042 }
1043
1044 #[test]
1045 fn test_array_streamer_size_limit() {
1046 let json = r#"[{"data": "x"}]"#;
1047
1048 let reader = Cursor::new(json.as_bytes());
1049 let config = StreamConfig::builder()
1050 .max_object_bytes(5) // Very small limit
1051 .build();
1052
1053 let streamer = JsonArrayStreamer::new(reader, config).unwrap();
1054 let result: Vec<_> = streamer.collect();
1055
1056 // Should error due to size limit
1057 assert!(result[0].is_err());
1058 }
1059
1060 // ==================== JsonLinesStreamer tests ====================
1061
1062 #[test]
1063 fn test_jsonl_streamer_simple() {
1064 let jsonl = r#"{"name": "Alice"}
1065{"name": "Bob"}
1066{"name": "Charlie"}"#;
1067
1068 let reader = Cursor::new(jsonl.as_bytes());
1069 let config = StreamConfig::default();
1070 let streamer = JsonLinesStreamer::new(reader, config);
1071
1072 let docs: Vec<_> = streamer.collect();
1073 assert_eq!(docs.len(), 3);
1074
1075 // Verify first document
1076 let doc1 = docs[0].as_ref().unwrap();
1077 if let Some(Item::Scalar(Value::String(name))) = doc1.root.get("name") {
1078 assert_eq!(name.as_ref(), "Alice");
1079 } else {
1080 panic!("Expected name field");
1081 }
1082 }
1083
1084 #[test]
1085 fn test_jsonl_streamer_blank_lines() {
1086 let jsonl = r#"{"id": "1"}
1087
1088{"id": "2"}
1089
1090{"id": "3"}"#;
1091
1092 let reader = Cursor::new(jsonl.as_bytes());
1093 let config = StreamConfig::default();
1094 let streamer = JsonLinesStreamer::new(reader, config);
1095
1096 let docs: Vec<_> = streamer.collect();
1097 assert_eq!(docs.len(), 3);
1098 }
1099
1100 #[test]
1101 fn test_jsonl_streamer_comments() {
1102 let jsonl = r#"# This is a comment
1103{"id": "1"}
1104# Another comment
1105{"id": "2"}"#;
1106
1107 let reader = Cursor::new(jsonl.as_bytes());
1108 let config = StreamConfig::default();
1109 let streamer = JsonLinesStreamer::new(reader, config);
1110
1111 let docs: Vec<_> = streamer.collect();
1112 assert_eq!(docs.len(), 2);
1113 }
1114
1115 #[test]
1116 fn test_jsonl_streamer_empty() {
1117 let jsonl = "";
1118
1119 let reader = Cursor::new(jsonl.as_bytes());
1120 let config = StreamConfig::default();
1121 let streamer = JsonLinesStreamer::new(reader, config);
1122
1123 let docs: Vec<_> = streamer.collect();
1124 assert_eq!(docs.len(), 0);
1125 }
1126
1127 #[test]
1128 fn test_jsonl_streamer_invalid_json() {
1129 let jsonl = r#"{"valid": "json"}
1130{invalid json}
1131{"also": "valid"}"#;
1132
1133 let reader = Cursor::new(jsonl.as_bytes());
1134 let config = StreamConfig::default();
1135 let streamer = JsonLinesStreamer::new(reader, config);
1136
1137 let docs: Vec<_> = streamer.collect();
1138 assert_eq!(docs.len(), 3);
1139 assert!(docs[0].is_ok());
1140 assert!(docs[1].is_err()); // Invalid JSON line
1141 assert!(docs[2].is_ok());
1142 }
1143
1144 #[test]
1145 fn test_jsonl_streamer_line_number() {
1146 let jsonl = r#"{"id": "1"}
1147{"id": "2"}"#;
1148
1149 let reader = Cursor::new(jsonl.as_bytes());
1150 let config = StreamConfig::default();
1151 let mut streamer = JsonLinesStreamer::new(reader, config);
1152
1153 assert_eq!(streamer.line_number(), 0);
1154 let _ = streamer.next();
1155 assert_eq!(streamer.line_number(), 1);
1156 let _ = streamer.next();
1157 assert_eq!(streamer.line_number(), 2);
1158 }
1159
1160 #[test]
1161 fn test_jsonl_streamer_size_limit() {
1162 let jsonl = r#"{"data": "x"}"#;
1163
1164 let reader = Cursor::new(jsonl.as_bytes());
1165 let config = StreamConfig::builder()
1166 .max_object_bytes(5) // Very small limit
1167 .build();
1168
1169 let streamer = JsonLinesStreamer::new(reader, config);
1170 let result: Vec<_> = streamer.collect();
1171
1172 // Should error due to size limit
1173 assert!(result[0].is_err());
1174 }
1175
1176 // ==================== JsonLinesWriter tests ====================
1177
1178 #[test]
1179 fn test_jsonl_writer_simple() {
1180 let mut buffer = Vec::new();
1181 let mut writer = JsonLinesWriter::new(&mut buffer);
1182
1183 let mut doc1 = Document::new((2, 0));
1184 doc1.root.insert(
1185 "id".to_string(),
1186 Item::Scalar(Value::String("1".to_string().into())),
1187 );
1188 writer.write_document(&doc1).unwrap();
1189
1190 let mut doc2 = Document::new((2, 0));
1191 doc2.root.insert(
1192 "id".to_string(),
1193 Item::Scalar(Value::String("2".to_string().into())),
1194 );
1195 writer.write_document(&doc2).unwrap();
1196
1197 writer.flush().unwrap();
1198
1199 let output = String::from_utf8(buffer).unwrap();
1200 let lines: Vec<_> = output.lines().collect();
1201 assert_eq!(lines.len(), 2);
1202 assert!(lines[0].contains("\"id\""));
1203 assert!(lines[1].contains("\"id\""));
1204 }
1205
1206 #[test]
1207 fn test_jsonl_writer_empty_document() {
1208 let mut buffer = Vec::new();
1209 let mut writer = JsonLinesWriter::new(&mut buffer);
1210
1211 let doc = Document::new((2, 0));
1212 writer.write_document(&doc).unwrap();
1213 writer.flush().unwrap();
1214
1215 let output = String::from_utf8(buffer).unwrap();
1216 assert_eq!(output.trim(), "{}");
1217 }
1218
1219 #[test]
1220 fn test_jsonl_roundtrip() {
1221 // Write documents
1222 let mut buffer = Vec::new();
1223 let mut writer = JsonLinesWriter::new(&mut buffer);
1224
1225 for i in 1..=3 {
1226 let mut doc = Document::new((2, 0));
1227 doc.root.insert(
1228 "id".to_string(),
1229 Item::Scalar(Value::String(i.to_string().into())),
1230 );
1231 doc.root
1232 .insert("value".to_string(), Item::Scalar(Value::Int(i * 10)));
1233 writer.write_document(&doc).unwrap();
1234 }
1235 writer.flush().unwrap();
1236
1237 // Read documents back
1238 let reader = Cursor::new(buffer);
1239 let config = StreamConfig::default();
1240 let streamer = JsonLinesStreamer::new(reader, config);
1241
1242 let docs: Vec<_> = streamer.collect();
1243 assert_eq!(docs.len(), 3);
1244
1245 // Verify first document
1246 let doc1 = docs[0].as_ref().unwrap();
1247 assert_eq!(
1248 doc1.root.get("id").unwrap().as_scalar().unwrap(),
1249 &Value::String("1".to_string().into())
1250 );
1251 assert_eq!(
1252 doc1.root.get("value").unwrap().as_scalar().unwrap(),
1253 &Value::Int(10)
1254 );
1255 }
1256}