hedl_stream/
parser.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming parser implementation.
19//!
20//! This module provides the core streaming parser for HEDL documents. The parser
21//! processes HEDL files incrementally, yielding events as they are encountered,
22//! making it suitable for processing files that are too large to fit in memory.
23//!
24//! # Design Philosophy
25//!
26//! - **Memory Efficiency**: Only the current line and parsing state are kept in memory
27//! - **Iterator-Based**: Standard Rust iterator interface for easy composition
28//! - **Error Recovery**: Clear error messages with line numbers for debugging
29//! - **Safety**: Built-in timeout protection against malicious/untrusted input
30//!
31//! # Basic Usage
32//!
33//! ```rust
34//! use hedl_stream::{StreamingParser, NodeEvent};
35//! use std::io::Cursor;
36//!
37//! let input = r#"
38//! %VERSION: 1.0
39//! %STRUCT: User: [id, name, email]
40//! ---
41//! users: @User
42//!   | alice, Alice Smith, alice@example.com
43//!   | bob, Bob Jones, bob@example.com
44//! "#;
45//!
46//! let parser = StreamingParser::new(Cursor::new(input)).unwrap();
47//!
48//! for event in parser {
49//!     match event {
50//!         Ok(NodeEvent::Node(node)) => {
51//!             println!("Found {}: {}", node.type_name, node.id);
52//!         }
53//!         Err(e) => eprintln!("Parse error: {}", e),
54//!         _ => {}
55//!     }
56//! }
57//! ```
58
59use crate::buffer_config::BufferSizeHint;
60use crate::buffer_pool::{BufferPool, MemoryLimits};
61use crate::error::{StreamError, StreamResult};
62use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
63use crate::reader::LineReader;
64use hedl_core::lex::{calculate_indent, is_valid_key_token, is_valid_type_name};
65use hedl_core::Value;
66use std::io::Read;
67use std::time::{Duration, Instant};
68
69/// Type alias for list context lookup result: (`type_name`, schema, optional `last_node` info)
70type ListContextResult = (String, Vec<String>, Option<(String, String)>);
71
72/// Configuration options for the streaming parser.
73///
74/// Controls memory limits, buffer sizes, timeout behavior, and buffer pooling.
75///
76/// # Examples
77///
78/// ## Default Configuration
79///
80/// ```rust
81/// use hedl_stream::StreamingParserConfig;
82///
83/// let config = StreamingParserConfig::default();
84/// assert_eq!(config.max_line_length, 1_000_000);
85/// assert_eq!(config.max_indent_depth, 100);
86/// assert_eq!(config.buffer_size, 64 * 1024);
87/// assert_eq!(config.timeout, None);
88/// ```
89///
90/// ## Custom Configuration for Large Files
91///
92/// ```rust
93/// use hedl_stream::{StreamingParserConfig, BufferSizeHint};
94///
95/// let config = StreamingParserConfig::default()
96///     .with_buffer_hint(BufferSizeHint::Large)
97///     .with_buffer_pooling(true);
98/// ```
99///
100/// ## Configuration for Untrusted Input
101///
102/// ```rust
103/// use hedl_stream::{StreamingParserConfig, MemoryLimits};
104/// use std::time::Duration;
105///
106/// let config = StreamingParserConfig {
107///     max_line_length: 100_000,
108///     max_indent_depth: 50,
109///     buffer_size: 32 * 1024,
110///     timeout: Some(Duration::from_secs(10)),
111///     memory_limits: MemoryLimits::untrusted(),
112///     enable_pooling: false,
113/// };
114/// ```
115#[derive(Debug, Clone)]
116pub struct StreamingParserConfig {
117    /// Maximum line length in bytes.
118    ///
119    /// Lines exceeding this length will cause a parsing error. This protects against
120    /// malformed input with extremely long lines that could exhaust memory.
121    ///
122    /// Default: 1,000,000 bytes (1MB)
123    pub max_line_length: usize,
124
125    /// Maximum indentation depth.
126    ///
127    /// Indentation levels exceeding this depth will cause a parsing error. This
128    /// protects against deeply nested structures that could cause stack overflow
129    /// or performance issues.
130    ///
131    /// Default: 100 levels
132    pub max_indent_depth: usize,
133
134    /// Buffer size for reading input.
135    ///
136    /// Larger buffers can improve performance for large files by reducing the
137    /// number of system calls, but use more memory.
138    ///
139    /// Default: 64KB
140    pub buffer_size: usize,
141
142    /// Timeout for parsing operations.
143    ///
144    /// If set, the parser will return a `StreamError::Timeout` if parsing takes
145    /// longer than the specified duration. This protects against infinite loops
146    /// from malicious or malformed input.
147    ///
148    /// Set to `None` to disable timeout checking (default for trusted input).
149    ///
150    /// Default: None (no timeout)
151    ///
152    /// # Performance Note
153    ///
154    /// Timeout checking is performed periodically (every 100 operations) to minimize
155    /// overhead. For very fast parsing, the actual timeout may slightly exceed the
156    /// configured limit.
157    pub timeout: Option<Duration>,
158
159    /// Memory limits for buffer management.
160    ///
161    /// Controls maximum buffer sizes, line lengths, and pool configuration.
162    /// See [`MemoryLimits`] for preset configurations.
163    ///
164    /// Default: `MemoryLimits::default()`
165    pub memory_limits: MemoryLimits,
166
167    /// Enable buffer pooling for high-throughput scenarios.
168    ///
169    /// When enabled, the parser reuses string and value buffers across operations,
170    /// reducing allocation overhead. Beneficial for processing many files in sequence
171    /// or high-throughput server workloads.
172    ///
173    /// Default: false (for backward compatibility)
174    pub enable_pooling: bool,
175}
176
177impl Default for StreamingParserConfig {
178    fn default() -> Self {
179        Self {
180            max_line_length: 1_000_000,
181            max_indent_depth: 100,
182            buffer_size: 64 * 1024,
183            timeout: None,
184            memory_limits: MemoryLimits::default(),
185            enable_pooling: false,
186        }
187    }
188}
189
190impl StreamingParserConfig {
191    /// Config with no limits (use for trusted input only).
192    ///
193    /// # Security Warning
194    ///
195    /// This configuration removes the line length limit, which can expose
196    /// your application to denial-of-service attacks if processing untrusted input.
197    /// Only use this for trusted, controlled environments.
198    ///
199    /// # Examples
200    ///
201    /// ```rust
202    /// use hedl_stream::StreamingParserConfig;
203    ///
204    /// // For trusted input where you want to allow arbitrarily long lines
205    /// let config = StreamingParserConfig::unlimited();
206    /// ```
207    #[must_use]
208    pub fn unlimited() -> Self {
209        Self {
210            max_line_length: usize::MAX,
211            ..Default::default()
212        }
213    }
214
215    /// Configure buffer size using a size hint.
216    ///
217    /// # Examples
218    ///
219    /// ```rust
220    /// use hedl_stream::{StreamingParserConfig, BufferSizeHint};
221    ///
222    /// let config = StreamingParserConfig::default()
223    ///     .with_buffer_hint(BufferSizeHint::Large);
224    /// assert_eq!(config.buffer_size, 256 * 1024);
225    /// ```
226    #[must_use]
227    pub fn with_buffer_hint(mut self, hint: BufferSizeHint) -> Self {
228        self.buffer_size = hint.size();
229        self
230    }
231
232    /// Enable or disable buffer pooling.
233    ///
234    /// # Examples
235    ///
236    /// ```rust
237    /// use hedl_stream::StreamingParserConfig;
238    ///
239    /// let config = StreamingParserConfig::default()
240    ///     .with_buffer_pooling(true);
241    /// assert_eq!(config.enable_pooling, true);
242    /// ```
243    #[must_use]
244    pub fn with_buffer_pooling(mut self, enabled: bool) -> Self {
245        self.enable_pooling = enabled;
246        self
247    }
248
249    /// Configure memory limits.
250    ///
251    /// # Examples
252    ///
253    /// ```rust
254    /// use hedl_stream::{StreamingParserConfig, MemoryLimits};
255    ///
256    /// let config = StreamingParserConfig::default()
257    ///     .with_memory_limits(MemoryLimits::high_throughput());
258    /// ```
259    #[must_use]
260    pub fn with_memory_limits(mut self, limits: MemoryLimits) -> Self {
261        self.memory_limits = limits;
262        // Sync max_line_length with memory limits
263        self.max_line_length = limits.max_line_length;
264        self
265    }
266
267    /// Configure buffer pool size (when pooling is enabled).
268    ///
269    /// # Examples
270    ///
271    /// ```rust
272    /// use hedl_stream::StreamingParserConfig;
273    ///
274    /// let config = StreamingParserConfig::default()
275    ///     .with_buffer_pooling(true)
276    ///     .with_pool_size(50);
277    /// assert_eq!(config.memory_limits.max_pool_size, 50);
278    /// ```
279    #[must_use]
280    pub fn with_pool_size(mut self, size: usize) -> Self {
281        self.memory_limits.max_pool_size = size;
282        self
283    }
284}
285
286/// Streaming HEDL parser.
287///
288/// Processes HEDL documents incrementally, yielding `NodeEvent` items as they
289/// are parsed without loading the entire document into memory. This makes it
290/// suitable for processing multi-gigabyte files on systems with limited RAM.
291///
292/// # Memory Characteristics
293///
294/// - **Header**: Parsed once at initialization and kept in memory
295/// - **Per-Line**: Only current line and parsing context (stack depth proportional to nesting)
296/// - **No Buffering**: Nodes are yielded immediately after parsing
297///
298/// # When to Use
299///
300/// - **Large Files**: Files too large to fit comfortably in memory
301/// - **Streaming Workflows**: Processing data as it arrives (pipes, network streams)
302/// - **Memory-Constrained**: Embedded systems or containers with memory limits
303/// - **ETL Pipelines**: Extract-transform-load workflows with HEDL data
304///
305/// # Iterator Interface
306///
307/// `StreamingParser` implements `Iterator<Item = StreamResult<NodeEvent>>`, allowing
308/// use with standard iterator methods like `filter`, `map`, `collect`, etc.
309///
310/// # Examples
311///
312/// ## Basic Streaming Parse
313///
314/// ```rust
315/// use hedl_stream::{StreamingParser, NodeEvent};
316/// use std::fs::File;
317/// use std::io::BufReader;
318///
319/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
320/// # use std::io::Cursor;
321/// # let file = Cursor::new(r#"
322/// # %VERSION: 1.0
323/// # %STRUCT: User: [id, name]
324/// # ---
325/// # users: @User
326/// #   | alice, Alice
327/// # "#);
328/// let reader = BufReader::new(file);
329/// let parser = StreamingParser::new(reader)?;
330///
331/// for event in parser {
332///     match event? {
333///         NodeEvent::Node(node) => {
334///             println!("Processing {}: {}", node.type_name, node.id);
335///             // Process node immediately, no buffering
336///         }
337///         NodeEvent::ListStart { type_name, .. } => {
338///             println!("Starting list of {}", type_name);
339///         }
340///         NodeEvent::ListEnd { count, .. } => {
341///             println!("Finished list with {} items", count);
342///         }
343///         _ => {}
344///     }
345/// }
346/// # Ok(())
347/// # }
348/// ```
349///
350/// ## Filtering During Parse
351///
352/// ```rust
353/// use hedl_stream::{StreamingParser, NodeEvent};
354/// use std::io::Cursor;
355///
356/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
357/// let input = r#"
358/// %VERSION: 1.0
359/// %STRUCT: User: [id, name, active]
360/// ---
361/// users: @User
362///   | alice, Alice, true
363///   | bob, Bob, false
364///   | carol, Carol, true
365/// "#;
366///
367/// let parser = StreamingParser::new(Cursor::new(input))?;
368///
369/// // Only collect active users
370/// let active_users: Vec<_> = parser
371///     .filter_map(|event| event.ok())
372///     .filter_map(|event| {
373///         if let NodeEvent::Node(node) = event {
374///             Some(node)
375///         } else {
376///             None
377///         }
378///     })
379///     .filter(|node| {
380///         // Check if 'active' field (index 2) is true
381///         matches!(node.get_field(2), Some(hedl_core::Value::Bool(true)))
382///     })
383///     .collect();
384///
385/// assert_eq!(active_users.len(), 2); // alice and carol
386/// # Ok(())
387/// # }
388/// ```
389///
390/// ## With Timeout for Untrusted Input
391///
392/// ```rust
393/// use hedl_stream::{StreamingParser, StreamingParserConfig, StreamError};
394/// use std::time::Duration;
395/// use std::io::Cursor;
396///
397/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
398/// let untrusted_input = "..."; // Input from external source
399///
400/// let config = StreamingParserConfig {
401///     timeout: Some(Duration::from_secs(5)),
402///     ..Default::default()
403/// };
404///
405/// let parser = StreamingParser::with_config(
406///     Cursor::new(untrusted_input),
407///     config
408/// )?;
409///
410/// for event in parser {
411///     match event {
412///         Ok(event) => {
413///             // Process event
414///         }
415///         Err(StreamError::Timeout { elapsed, limit }) => {
416///             eprintln!("Parsing timed out after {:?}", elapsed);
417///             break;
418///         }
419///         Err(e) => {
420///             eprintln!("Parse error: {}", e);
421///             break;
422///         }
423///     }
424/// }
425/// # Ok(())
426/// # }
427/// ```
428///
429/// ## Processing Nested Structures
430///
431/// ```rust
432/// use hedl_stream::{StreamingParser, NodeEvent};
433/// use std::io::Cursor;
434///
435/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
436/// let input = r#"
437/// %VERSION: 1.0
438/// %STRUCT: User: [id, name]
439/// %STRUCT: Order: [id, amount]
440/// %NEST: User > Order
441/// ---
442/// users: @User
443///   | alice, Alice
444///     | order1, 100.00
445///     | order2, 50.00
446///   | bob, Bob
447///     | order3, 75.00
448/// "#;
449///
450/// let parser = StreamingParser::new(Cursor::new(input))?;
451///
452/// for event in parser.filter_map(|e| e.ok()) {
453///     if let NodeEvent::Node(node) = event {
454///         if node.is_nested() {
455///             println!("  Child: {} belongs to {:?}",
456///                 node.id, node.parent_id);
457///         } else {
458///             println!("Parent: {}", node.id);
459///         }
460///     }
461/// }
462/// # Ok(())
463/// # }
464/// ```
465///
466/// # Error Handling
467///
468/// Parsing errors include line numbers for easy debugging:
469///
470/// ```rust
471/// use hedl_stream::{StreamingParser, StreamError};
472/// use std::io::Cursor;
473///
474/// let bad_input = r#"
475/// %VERSION: 1.0
476/// ---
477/// invalid line without colon
478/// "#;
479///
480/// let parser = StreamingParser::new(Cursor::new(bad_input)).unwrap();
481///
482/// for event in parser {
483///     if let Err(e) = event {
484///         if let Some(line) = e.line() {
485///             eprintln!("Error at line {}: {}", line, e);
486///         }
487///     }
488/// }
489/// ```
490pub struct StreamingParser<R: Read> {
491    reader: LineReader<R>,
492    config: StreamingParserConfig,
493    header: Option<HeaderInfo>,
494    state: ParserState,
495    finished: bool,
496    errored: bool,              // Track if an error occurred to skip finalize
497    sent_end_of_document: bool, // Track if EndOfDocument has been returned
498    start_time: Instant,
499    operations_count: usize, // Track operations for periodic timeout checks
500    #[allow(dead_code)] // TODO: Integrate buffer pooling in parse_data_row
501    buffer_pool: Option<BufferPool>, // Optional buffer pool for high-throughput scenarios
502}
503
504#[derive(Debug)]
505struct ParserState {
506    /// Stack of active contexts.
507    stack: Vec<Context>,
508    /// Previous row values for ditto handling.
509    prev_row: Option<Vec<Value>>,
510}
511
512#[derive(Debug, Clone)]
513enum Context {
514    Root,
515    Object {
516        #[allow(dead_code)]
517        key: String,
518        indent: usize,
519    },
520    List {
521        key: String,
522        type_name: String,
523        schema: Vec<String>,
524        row_indent: usize,
525        count: usize,
526        last_node: Option<(String, String)>, // (type, id)
527    },
528}
529
530impl<R: Read> StreamingParser<R> {
531    /// Create a new streaming parser with default configuration.
532    ///
533    /// The parser immediately reads and validates the HEDL header (version and
534    /// schema directives). If the header is invalid, this function returns an error.
535    ///
536    /// # Parameters
537    ///
538    /// - `reader`: Any type implementing `Read` (files, network streams, buffers, etc.)
539    ///
540    /// # Returns
541    ///
542    /// - `Ok(parser)`: Parser ready to yield events
543    /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
544    ///
545    /// # Examples
546    ///
547    /// ## From a File
548    ///
549    /// ```rust,no_run
550    /// use hedl_stream::StreamingParser;
551    /// use std::fs::File;
552    /// use std::io::BufReader;
553    ///
554    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
555    /// let file = File::open("data.hedl")?;
556    /// let reader = BufReader::new(file);
557    /// let parser = StreamingParser::new(reader)?;
558    /// # Ok(())
559    /// # }
560    /// ```
561    ///
562    /// ## From a String
563    ///
564    /// ```rust
565    /// use hedl_stream::StreamingParser;
566    /// use std::io::Cursor;
567    ///
568    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
569    /// let data = r#"
570    /// %VERSION: 1.0
571    /// %STRUCT: User: [id, name]
572    /// ---
573    /// users: @User
574    ///   | alice, Alice
575    /// "#;
576    ///
577    /// let parser = StreamingParser::new(Cursor::new(data))?;
578    /// # Ok(())
579    /// # }
580    /// ```
581    ///
582    /// ## From Stdin
583    ///
584    /// ```rust,no_run
585    /// use hedl_stream::StreamingParser;
586    /// use std::io::stdin;
587    ///
588    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
589    /// let parser = StreamingParser::new(stdin().lock())?;
590    /// # Ok(())
591    /// # }
592    /// ```
593    ///
594    /// # Errors
595    ///
596    /// - `StreamError::MissingVersion`: No `%VERSION` directive found
597    /// - `StreamError::InvalidVersion`: Invalid version format
598    /// - `StreamError::Syntax`: Malformed header directive
599    /// - `StreamError::Io`: I/O error reading input
600    pub fn new(reader: R) -> StreamResult<Self> {
601        Self::with_config(reader, StreamingParserConfig::default())
602    }
603
604    /// Create a streaming parser with custom configuration.
605    ///
606    /// Use this when you need to control memory limits, buffer sizes, or enable
607    /// timeout protection for untrusted input.
608    ///
609    /// # Parameters
610    ///
611    /// - `reader`: Any type implementing `Read`
612    /// - `config`: Parser configuration options
613    ///
614    /// # Returns
615    ///
616    /// - `Ok(parser)`: Parser ready to yield events
617    /// - `Err(e)`: Configuration invalid or header parsing failed
618    ///
619    /// # Examples
620    ///
621    /// ## With Timeout Protection
622    ///
623    /// ```rust
624    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
625    /// use std::time::Duration;
626    /// use std::io::Cursor;
627    ///
628    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
629    /// let config = StreamingParserConfig {
630    ///     timeout: Some(Duration::from_secs(30)),
631    ///     ..Default::default()
632    /// };
633    ///
634    /// let untrusted_input = "...";
635    /// let parser = StreamingParser::with_config(
636    ///     Cursor::new(untrusted_input),
637    ///     config
638    /// )?;
639    /// # Ok(())
640    /// # }
641    /// ```
642    ///
643    /// ## For Large Files
644    ///
645    /// ```rust
646    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
647    /// use std::io::Cursor;
648    ///
649    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
650    /// let config = StreamingParserConfig {
651    ///     buffer_size: 256 * 1024,      // 256KB read buffer
652    ///     max_line_length: 10_000_000,  // 10MB max line
653    ///     max_indent_depth: 1000,       // Deep nesting allowed
654    ///     timeout: None,
655    ///     ..Default::default()
656    /// };
657    ///
658    /// let parser = StreamingParser::with_config(
659    ///     Cursor::new("..."),
660    ///     config
661    /// )?;
662    /// # Ok(())
663    /// # }
664    /// ```
665    ///
666    /// ## For Constrained Environments
667    ///
668    /// ```rust
669    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
670    /// use std::time::Duration;
671    /// use std::io::Cursor;
672    ///
673    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
674    /// let config = StreamingParserConfig {
675    ///     buffer_size: 8 * 1024,        // Small 8KB buffer
676    ///     max_line_length: 100_000,     // 100KB max line
677    ///     max_indent_depth: 50,         // Limited nesting
678    ///     timeout: Some(Duration::from_secs(10)),
679    ///     ..Default::default()
680    /// };
681    ///
682    /// let parser = StreamingParser::with_config(
683    ///     Cursor::new("..."),
684    ///     config
685    /// )?;
686    /// # Ok(())
687    /// # }
688    /// ```
689    ///
690    /// # Errors
691    ///
692    /// Same as [`new()`](Self::new), plus:
693    ///
694    /// - `StreamError::Timeout`: Header parsing exceeded configured timeout
695    pub fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
696        // Initialize buffer pool if enabled
697        let buffer_pool = if config.enable_pooling && config.memory_limits.enable_buffer_pooling {
698            Some(BufferPool::new(config.memory_limits.max_pool_size))
699        } else {
700            None
701        };
702
703        let mut parser = Self {
704            reader: LineReader::with_capacity_and_max_length(
705                reader,
706                config.buffer_size,
707                config.max_line_length,
708            ),
709            config,
710            header: None,
711            state: ParserState {
712                stack: vec![Context::Root],
713                prev_row: None,
714            },
715            finished: false,
716            errored: false,
717            sent_end_of_document: false,
718            start_time: Instant::now(),
719            operations_count: 0,
720            buffer_pool,
721        };
722
723        // Parse header immediately
724        parser.parse_header()?;
725
726        Ok(parser)
727    }
728
729    /// Check if timeout has been exceeded.
730    /// This is called periodically during parsing to prevent infinite loops.
731    #[inline]
732    fn check_timeout(&self) -> StreamResult<()> {
733        if let Some(timeout) = self.config.timeout {
734            let elapsed = self.start_time.elapsed();
735            if elapsed > timeout {
736                return Err(StreamError::Timeout {
737                    elapsed,
738                    limit: timeout,
739                });
740            }
741        }
742        Ok(())
743    }
744
745    /// Get the parsed header information.
746    ///
747    /// Returns header metadata including version, schema definitions, aliases,
748    /// and nesting rules. This is available immediately after parser creation.
749    ///
750    /// # Returns
751    ///
752    /// - `Some(&HeaderInfo)`: Header was successfully parsed
753    /// - `None`: Should never happen after successful parser creation
754    ///
755    /// # Examples
756    ///
757    /// ## Inspecting Schema Definitions
758    ///
759    /// ```rust
760    /// use hedl_stream::StreamingParser;
761    /// use std::io::Cursor;
762    ///
763    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
764    /// let input = r#"
765    /// %VERSION: 1.0
766    /// %STRUCT: User: [id, name, email]
767    /// %STRUCT: Order: [id, user_id, amount]
768    /// %ALIAS: active = "Active"
769    /// %NEST: User > Order
770    /// ---
771    /// "#;
772    ///
773    /// let parser = StreamingParser::new(Cursor::new(input))?;
774    /// let header = parser.header().unwrap();
775    ///
776    /// // Check version
777    /// assert_eq!(header.version, (1, 0));
778    ///
779    /// // Get schema
780    /// let user_schema = header.get_schema("User").unwrap();
781    /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
782    ///
783    /// // Check aliases
784    /// assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
785    ///
786    /// // Check nesting rules
787    /// assert_eq!(header.get_child_type("User"), Some(&"Order".to_string()));
788    /// # Ok(())
789    /// # }
790    /// ```
791    ///
792    /// ## Validating Before Processing
793    ///
794    /// ```rust
795    /// use hedl_stream::StreamingParser;
796    /// use std::io::Cursor;
797    ///
798    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
799    /// let input = r#"
800    /// %VERSION: 1.0
801    /// %STRUCT: User: [id, name]
802    /// ---
803    /// users: @User
804    ///   | alice, Alice
805    /// "#;
806    ///
807    /// let parser = StreamingParser::new(Cursor::new(input))?;
808    ///
809    /// // Validate we have the expected schema before processing
810    /// if let Some(header) = parser.header() {
811    ///     if header.version.0 != 1 {
812    ///         eprintln!("Warning: Unexpected major version");
813    ///     }
814    ///
815    ///     if !header.structs.contains_key("User") {
816    ///         return Err("Missing User schema".into());
817    ///     }
818    /// }
819    ///
820    /// // Proceed with parsing...
821    /// # Ok(())
822    /// # }
823    /// ```
824    pub fn header(&self) -> Option<&HeaderInfo> {
825        self.header.as_ref()
826    }
827
828    /// Parse the header section.
829    fn parse_header(&mut self) -> StreamResult<()> {
830        let mut header = HeaderInfo::new();
831        let mut found_version = false;
832        let mut _found_separator = false;
833
834        while let Some((line_num, line)) = self.reader.next_line()? {
835            // Check timeout every iteration in header parsing
836            self.check_timeout()?;
837
838            let trimmed = line.trim();
839
840            // Skip blank lines and comments
841            if trimmed.is_empty() || trimmed.starts_with('#') {
842                continue;
843            }
844
845            // Check for separator
846            if trimmed == "---" {
847                _found_separator = true;
848                break;
849            }
850
851            // Parse directives
852            if trimmed.starts_with('%') {
853                self.parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
854            } else {
855                // Not a directive - might be body content without separator
856                self.reader.push_back(line_num, line);
857                break;
858            }
859        }
860
861        if !found_version {
862            return Err(StreamError::MissingVersion);
863        }
864
865        self.header = Some(header);
866        Ok(())
867    }
868
869    fn parse_directive(
870        &self,
871        line: &str,
872        line_num: usize,
873        header: &mut HeaderInfo,
874        found_version: &mut bool,
875    ) -> StreamResult<()> {
876        if line.starts_with("%VERSION") {
877            self.parse_version_directive(line, header, found_version)
878        } else if line.starts_with("%STRUCT") {
879            self.parse_struct_directive(line, line_num, header)
880        } else if line.starts_with("%ALIAS") {
881            self.parse_alias_directive(line, line_num, header)
882        } else if line.starts_with("%NEST") {
883            self.parse_nest_directive(line, line_num, header)
884        } else {
885            Ok(())
886        }
887    }
888
889    /// Strip inline comments from a directive line.
890    ///
891    /// Handles `#` characters outside of quoted strings and brackets.
892    /// Returns the content before the first unquoted/unbracketed `#`.
893    fn strip_inline_comment(text: &str) -> &str {
894        let mut in_quotes = false;
895        let mut in_brackets = 0;
896        let mut quote_char = '"';
897
898        for (i, c) in text.char_indices() {
899            match c {
900                '"' | '\'' if !in_quotes => {
901                    in_quotes = true;
902                    quote_char = c;
903                }
904                c if in_quotes && c == quote_char => {
905                    in_quotes = false;
906                }
907                '[' if !in_quotes => in_brackets += 1,
908                ']' if !in_quotes && in_brackets > 0 => in_brackets -= 1,
909                '#' if !in_quotes && in_brackets == 0 => {
910                    return text[..i].trim_end();
911                }
912                _ => {}
913            }
914        }
915        text
916    }
917
918    fn parse_version_directive(
919        &self,
920        line: &str,
921        header: &mut HeaderInfo,
922        found_version: &mut bool,
923    ) -> StreamResult<()> {
924        // Strip inline comments first
925        let line = Self::strip_inline_comment(line);
926        // Safe: starts_with check guarantees prefix exists
927        let rest = line.strip_prefix("%VERSION").expect("prefix exists").trim();
928        // Handle both "%VERSION: 1.0" and "%VERSION: 1.0" formats
929        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
930        let parts: Vec<&str> = rest.split('.').collect();
931
932        if parts.len() != 2 {
933            return Err(StreamError::InvalidVersion(rest.to_string()));
934        }
935
936        let major: u32 = parts[0]
937            .parse()
938            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
939        let minor: u32 = parts[1]
940            .parse()
941            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
942
943        header.version = (major, minor);
944        *found_version = true;
945        Ok(())
946    }
947
948    fn parse_struct_directive(
949        &self,
950        line: &str,
951        line_num: usize,
952        header: &mut HeaderInfo,
953    ) -> StreamResult<()> {
954        // Strip inline comments first
955        let line = Self::strip_inline_comment(line);
956        // Safe: starts_with check guarantees prefix exists
957        let rest = line.strip_prefix("%STRUCT").expect("prefix exists").trim();
958        // Handle both "%STRUCT TypeName: [cols]" and "%STRUCT: TypeName: [cols]" formats
959        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
960
961        let bracket_start = rest
962            .find('[')
963            .ok_or_else(|| StreamError::syntax(line_num, "missing '[' in %STRUCT"))?;
964        let bracket_end = rest
965            .find(']')
966            .ok_or_else(|| StreamError::syntax(line_num, "missing ']' in %STRUCT"))?;
967
968        // Type name may have trailing colon and optional count, strip them
969        // Format: TypeName: or TypeName (N):
970        let type_part = rest[..bracket_start].trim().trim_end_matches(':').trim();
971        // Handle optional count: "TypeName (N)" -> extract just "TypeName"
972        let type_name = if let Some(paren_pos) = type_part.find('(') {
973            type_part[..paren_pos].trim()
974        } else {
975            type_part
976        };
977        if !is_valid_type_name(type_name) {
978            return Err(StreamError::syntax(
979                line_num,
980                format!("invalid type name: {type_name}"),
981            ));
982        }
983
984        let cols_str = &rest[bracket_start + 1..bracket_end];
985        let columns: Vec<String> = cols_str
986            .split(',')
987            .map(|s| s.trim().to_string())
988            .filter(|s| !s.is_empty())
989            .collect();
990
991        if columns.is_empty() {
992            return Err(StreamError::syntax(line_num, "empty schema"));
993        }
994
995        header.structs.insert(type_name.to_string(), columns);
996        Ok(())
997    }
998
999    fn parse_alias_directive(
1000        &self,
1001        line: &str,
1002        line_num: usize,
1003        header: &mut HeaderInfo,
1004    ) -> StreamResult<()> {
1005        // Strip inline comments first
1006        let line = Self::strip_inline_comment(line);
1007        // Safe: starts_with check guarantees prefix exists
1008        let rest = line.strip_prefix("%ALIAS").expect("prefix exists").trim();
1009        // Handle both "%ALIAS: %short: = ..." and "%ALIAS: %short: ..." formats
1010        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
1011
1012        // Support both '=' and ':' as separators
1013        let sep_pos = rest
1014            .find('=')
1015            .or_else(|| rest.find(':'))
1016            .ok_or_else(|| StreamError::syntax(line_num, "missing '=' or ':' in %ALIAS"))?;
1017
1018        let alias = rest[..sep_pos].trim();
1019        let value = rest[sep_pos + 1..].trim().trim_matches('"');
1020
1021        header.aliases.insert(alias.to_string(), value.to_string());
1022        Ok(())
1023    }
1024
1025    fn parse_nest_directive(
1026        &self,
1027        line: &str,
1028        line_num: usize,
1029        header: &mut HeaderInfo,
1030    ) -> StreamResult<()> {
1031        // Strip inline comments first
1032        let line = Self::strip_inline_comment(line);
1033        // Safe: starts_with check guarantees prefix exists
1034        let rest = line.strip_prefix("%NEST").expect("prefix exists").trim();
1035        // Handle both "%NEST: Parent > Child" and "%NEST: Parent > Child" formats
1036        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
1037
1038        let arrow_pos = rest
1039            .find('>')
1040            .ok_or_else(|| StreamError::syntax(line_num, "missing '>' in %NEST"))?;
1041
1042        let parent = rest[..arrow_pos].trim();
1043        let child = rest[arrow_pos + 1..].trim();
1044
1045        if !is_valid_type_name(parent) || !is_valid_type_name(child) {
1046            return Err(StreamError::syntax(line_num, "invalid type name in %NEST"));
1047        }
1048
1049        header.nests.insert(parent.to_string(), child.to_string());
1050        Ok(())
1051    }
1052
1053    /// Parse the next event from the stream.
1054    fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
1055        // If errored, stop immediately without finalize
1056        if self.errored {
1057            return Ok(None);
1058        }
1059        // If finished, continue emitting remaining context ends until stack is empty
1060        if self.finished {
1061            return self.finalize();
1062        }
1063
1064        loop {
1065            // Check timeout periodically (every 100 operations to minimize overhead)
1066            self.operations_count += 1;
1067            if self.operations_count % 100 == 0 {
1068                self.check_timeout()?;
1069            }
1070
1071            let (line_num, line) = if let Some(l) = self.reader.next_line()? {
1072                l
1073            } else {
1074                self.finished = true;
1075                // Emit any remaining list ends
1076                return self.finalize();
1077            };
1078
1079            let trimmed = line.trim();
1080
1081            // Skip blank lines and comments
1082            if trimmed.is_empty() || trimmed.starts_with('#') {
1083                continue;
1084            }
1085
1086            // Calculate indentation
1087            let indent_info = calculate_indent(&line, line_num as u32)
1088                .map_err(|e| StreamError::syntax(line_num, e.to_string()))?;
1089
1090            let (indent, content) = match indent_info {
1091                Some(info) => (info.level, &line[info.spaces..]),
1092                None => continue,
1093            };
1094
1095            if indent > self.config.max_indent_depth {
1096                return Err(StreamError::syntax(
1097                    line_num,
1098                    format!("indent depth {indent} exceeds limit"),
1099                ));
1100            }
1101
1102            // Pop contexts as needed based on indentation
1103            let events = self.pop_contexts(indent)?;
1104            if let Some(event) = events {
1105                // Push back the current line to process after emitting list end
1106                self.reader.push_back(line_num, line);
1107                return Ok(Some(event));
1108            }
1109
1110            // Parse line content
1111            return self.parse_line(content, indent, line_num);
1112        }
1113    }
1114
1115    fn pop_contexts(&mut self, current_indent: usize) -> StreamResult<Option<NodeEvent>> {
1116        while self.state.stack.len() > 1 {
1117            // Safe: loop condition guarantees stack has elements
1118            let should_pop = match self.state.stack.last().expect("stack has elements") {
1119                Context::Root => false,
1120                Context::Object { indent, .. } => current_indent <= *indent,
1121                Context::List { row_indent, .. } => current_indent < *row_indent,
1122            };
1123
1124            if should_pop {
1125                // Safe: loop condition guarantees stack has elements
1126                let ctx = self.state.stack.pop().expect("stack has elements");
1127                match ctx {
1128                    Context::List {
1129                        key,
1130                        type_name,
1131                        count,
1132                        ..
1133                    } => {
1134                        return Ok(Some(NodeEvent::ListEnd {
1135                            key,
1136                            type_name,
1137                            count,
1138                        }));
1139                    }
1140                    Context::Object { key, .. } => {
1141                        return Ok(Some(NodeEvent::ObjectEnd { key }));
1142                    }
1143                    Context::Root => {
1144                        // Root context should never be popped
1145                    }
1146                }
1147            } else {
1148                break;
1149            }
1150        }
1151
1152        Ok(None)
1153    }
1154
1155    fn parse_line(
1156        &mut self,
1157        content: &str,
1158        indent: usize,
1159        line_num: usize,
1160    ) -> StreamResult<Option<NodeEvent>> {
1161        // Strip inline comment
1162        let content = strip_comment(content);
1163
1164        if let Some(row_content) = content.strip_prefix('|') {
1165            // Matrix row
1166            self.parse_matrix_row(row_content, indent, line_num)
1167        } else if let Some(colon_pos) = content.find(':') {
1168            let key = content[..colon_pos].trim();
1169            let after_colon = &content[colon_pos + 1..];
1170
1171            if !is_valid_key_token(key) {
1172                return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
1173            }
1174
1175            let after_colon_trimmed = after_colon.trim();
1176
1177            if after_colon_trimmed.is_empty() {
1178                // Object start: validate indent and context
1179                self.validate_indent_for_key_value(indent, line_num)?;
1180
1181                self.state.stack.push(Context::Object {
1182                    key: key.to_string(),
1183                    indent,
1184                });
1185                Ok(Some(NodeEvent::ObjectStart {
1186                    key: key.to_string(),
1187                    line: line_num,
1188                }))
1189            } else if after_colon_trimmed.starts_with('@')
1190                && self.is_list_start(after_colon_trimmed)
1191            {
1192                // List start: require space after colon
1193                if !after_colon.starts_with(' ') {
1194                    return Err(StreamError::syntax(
1195                        line_num,
1196                        "space required after ':' before '@'",
1197                    ));
1198                }
1199
1200                // List declarations are allowed in list context (for nested lists)
1201                // so we don't call validate_indent_for_key_value here
1202
1203                let (type_name, schema) = self.parse_list_start(after_colon_trimmed, line_num)?;
1204
1205                self.state.stack.push(Context::List {
1206                    key: key.to_string(),
1207                    type_name: type_name.clone(),
1208                    schema: schema.clone(),
1209                    row_indent: indent + 1,
1210                    count: 0,
1211                    last_node: None,
1212                });
1213
1214                self.state.prev_row = None;
1215
1216                Ok(Some(NodeEvent::ListStart {
1217                    key: key.to_string(),
1218                    type_name,
1219                    schema,
1220                    line: line_num,
1221                }))
1222            } else {
1223                // Key-value pair: require space after colon and validate indent
1224                if !after_colon.starts_with(' ') {
1225                    return Err(StreamError::syntax(
1226                        line_num,
1227                        "space required after ':' in key-value",
1228                    ));
1229                }
1230                self.validate_indent_for_key_value(indent, line_num)?;
1231
1232                let value = self.infer_value(after_colon.trim(), line_num)?;
1233                Ok(Some(NodeEvent::Scalar {
1234                    key: key.to_string(),
1235                    value,
1236                    line: line_num,
1237                }))
1238            }
1239        } else {
1240            Err(StreamError::syntax(line_num, "expected ':' in line"))
1241        }
1242    }
1243
1244    /// Validate that the indent is correct for a key-value or object start.
1245    ///
1246    /// Mirrors `validate_indent_for_child` from hedl-core:
1247    /// - Root context: expects indent 0
1248    /// - Object context: expects `parent_indent` + 1
1249    /// - List context: key-value not allowed (only list declarations)
1250    fn validate_indent_for_key_value(&self, indent: usize, line_num: usize) -> StreamResult<()> {
1251        let expected = match self.state.stack.last() {
1252            Some(Context::Root) | None => 0,
1253            Some(Context::Object {
1254                indent: parent_indent,
1255                ..
1256            }) => parent_indent + 1,
1257            Some(Context::List { .. }) => {
1258                return Err(StreamError::syntax(
1259                    line_num,
1260                    "cannot add key-value inside list context",
1261                ));
1262            }
1263        };
1264
1265        if indent != expected {
1266            return Err(StreamError::syntax(
1267                line_num,
1268                format!("expected indent level {expected}, got {indent}"),
1269            ));
1270        }
1271
1272        Ok(())
1273    }
1274
1275    #[inline]
1276    fn is_list_start(&self, s: &str) -> bool {
1277        let s = s.trim();
1278        if !s.starts_with('@') {
1279            return false;
1280        }
1281        let rest = &s[1..];
1282        let type_end = rest
1283            .find(|c: char| c == '[' || c.is_whitespace())
1284            .unwrap_or(rest.len());
1285        let type_name = &rest[..type_end];
1286        is_valid_type_name(type_name)
1287    }
1288
1289    fn parse_list_start(&self, s: &str, line_num: usize) -> StreamResult<(String, Vec<String>)> {
1290        let s = s.trim();
1291        let rest = &s[1..]; // Skip @
1292
1293        if let Some(bracket_pos) = rest.find('[') {
1294            // Inline schema: @TypeName[col1, col2]
1295            let type_name = &rest[..bracket_pos];
1296            if !is_valid_type_name(type_name) {
1297                return Err(StreamError::syntax(
1298                    line_num,
1299                    format!("invalid type name: {type_name}"),
1300                ));
1301            }
1302
1303            let bracket_end = rest
1304                .find(']')
1305                .ok_or_else(|| StreamError::syntax(line_num, "missing ']'"))?;
1306
1307            let cols_str = &rest[bracket_pos + 1..bracket_end];
1308            let mut columns = Vec::new();
1309
1310            for part in cols_str.split(',') {
1311                let col = part.trim();
1312                if col.is_empty() {
1313                    continue;
1314                }
1315                // Validate column name
1316                if !is_valid_key_token(col) {
1317                    return Err(StreamError::syntax(
1318                        line_num,
1319                        format!("invalid column name: {col}"),
1320                    ));
1321                }
1322                columns.push(col.to_string());
1323            }
1324
1325            // Check for empty schema
1326            if columns.is_empty() {
1327                return Err(StreamError::syntax(line_num, "empty inline schema"));
1328            }
1329
1330            // Check against declared schema if type exists in header
1331            if let Some(header) = &self.header {
1332                if let Some(declared) = header.structs.get(type_name) {
1333                    if declared != &columns {
1334                        return Err(StreamError::schema(
1335                            line_num,
1336                            format!(
1337                                "inline schema for '{type_name}' doesn't match declared schema"
1338                            ),
1339                        ));
1340                    }
1341                }
1342            }
1343
1344            Ok((type_name.to_string(), columns))
1345        } else {
1346            // Reference to declared schema: @TypeName
1347            let type_name = rest.trim();
1348            if !is_valid_type_name(type_name) {
1349                return Err(StreamError::syntax(
1350                    line_num,
1351                    format!("invalid type name: {type_name}"),
1352                ));
1353            }
1354
1355            let header = self
1356                .header
1357                .as_ref()
1358                .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1359
1360            let schema = header.structs.get(type_name).ok_or_else(|| {
1361                StreamError::schema(line_num, format!("undefined type: {type_name}"))
1362            })?;
1363
1364            Ok((type_name.to_string(), schema.clone()))
1365        }
1366    }
1367
1368    fn parse_matrix_row(
1369        &mut self,
1370        content: &str,
1371        indent: usize,
1372        line_num: usize,
1373    ) -> StreamResult<Option<NodeEvent>> {
1374        // Parse row prefix to extract optional child count and CSV content
1375        let (child_count, csv_content) = self.parse_row_prefix(content, line_num)?;
1376        let content = strip_comment(csv_content).trim();
1377
1378        // Find active list context
1379        let (type_name, schema, parent_info) = self.find_list_context(indent, line_num)?;
1380
1381        // Parse HEDL matrix row (comma-separated values after the |)
1382        // Use hedl_row parser for proper CSV-like parsing
1383        let fields = hedl_core::lex::parse_csv_row(content)
1384            .map_err(|e| StreamError::syntax(line_num, format!("row parse error: {e}")))?;
1385
1386        // Validate shape
1387        if fields.len() != schema.len() {
1388            return Err(StreamError::ShapeMismatch {
1389                line: line_num,
1390                expected: schema.len(),
1391                got: fields.len(),
1392            });
1393        }
1394
1395        // Infer values with ditto handling
1396        let mut values = Vec::with_capacity(fields.len());
1397        for (col_idx, field) in fields.iter().enumerate() {
1398            let value = if field.value == "^" {
1399                // Ditto - use previous row's value
1400                self.state
1401                    .prev_row
1402                    .as_ref()
1403                    .and_then(|prev| prev.get(col_idx).cloned())
1404                    .unwrap_or(Value::Null)
1405            } else if field.is_quoted {
1406                Value::String(field.value.clone().into())
1407            } else {
1408                self.infer_value(&field.value, line_num)?
1409            };
1410            values.push(value);
1411        }
1412
1413        // Get ID from first column
1414        let id = match &values[0] {
1415            Value::String(s) => s.to_string(),
1416            _ => return Err(StreamError::syntax(line_num, "ID column must be a string")),
1417        };
1418
1419        // Update context
1420        self.update_list_context(&type_name, &id);
1421        self.state.prev_row = Some(values.clone());
1422
1423        // Calculate depth as number of list contexts minus 1 (0-indexed nesting level)
1424        let depth = self
1425            .state
1426            .stack
1427            .iter()
1428            .filter(|ctx| matches!(ctx, Context::List { .. }))
1429            .count()
1430            .saturating_sub(1);
1431
1432        // Build node info
1433        let mut node = NodeInfo::new(type_name.clone(), id, values, depth, line_num);
1434
1435        if let Some((parent_type, parent_id)) = parent_info {
1436            node = node.with_parent(parent_type, parent_id);
1437        }
1438
1439        if let Some(count) = child_count {
1440            node = node.with_child_count(count);
1441        }
1442
1443        Ok(Some(NodeEvent::Node(node)))
1444    }
1445
1446    /// Parse row prefix to extract optional child count and CSV content.
1447    ///
1448    /// Handles `|[N]` syntax where N is the expected child count.
1449    /// Returns (Option<`child_count`>, `csv_content`).
1450    fn parse_row_prefix<'a>(
1451        &self,
1452        content: &'a str,
1453        _line_num: usize,
1454    ) -> StreamResult<(Option<usize>, &'a str)> {
1455        // Content is already after the leading |
1456        // Check for [N] pattern at start
1457        if content.starts_with('[') {
1458            if let Some(bracket_end) = content.find(']') {
1459                let count_str = &content[1..bracket_end];
1460                if let Ok(count) = count_str.parse::<usize>() {
1461                    // Count 0 is valid - means row has no children (empty parent)
1462                    // Skip [N] and any following space
1463                    let data = content[bracket_end + 1..].trim_start();
1464                    return Ok((Some(count), data));
1465                }
1466                // Invalid count format - fall through and treat as regular content
1467            }
1468        }
1469
1470        // No child count prefix
1471        Ok((None, content))
1472    }
1473
1474    fn find_list_context(
1475        &mut self,
1476        indent: usize,
1477        line_num: usize,
1478    ) -> StreamResult<ListContextResult> {
1479        let header = self
1480            .header
1481            .as_ref()
1482            .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1483
1484        for ctx in self.state.stack.iter().rev() {
1485            if let Context::List {
1486                type_name,
1487                schema,
1488                row_indent,
1489                last_node,
1490                ..
1491            } = ctx
1492            {
1493                if indent == *row_indent {
1494                    // Peer row
1495                    return Ok((type_name.clone(), schema.clone(), None));
1496                } else if indent == *row_indent + 1 {
1497                    // Child row
1498                    let parent_info = last_node.clone().ok_or_else(|| {
1499                        StreamError::orphan_row(line_num, "child row has no parent")
1500                    })?;
1501
1502                    let child_type = header.nests.get(type_name).ok_or_else(|| {
1503                        StreamError::orphan_row(
1504                            line_num,
1505                            format!("no NEST rule for parent type '{type_name}'"),
1506                        )
1507                    })?;
1508
1509                    let child_schema = header.structs.get(child_type).ok_or_else(|| {
1510                        StreamError::schema(
1511                            line_num,
1512                            format!("child type '{child_type}' not defined"),
1513                        )
1514                    })?;
1515
1516                    // Push child list context
1517                    self.state.stack.push(Context::List {
1518                        key: child_type.clone(),
1519                        type_name: child_type.clone(),
1520                        schema: child_schema.clone(),
1521                        row_indent: indent,
1522                        count: 0,
1523                        last_node: None,
1524                    });
1525
1526                    return Ok((child_type.clone(), child_schema.clone(), Some(parent_info)));
1527                }
1528            }
1529        }
1530
1531        Err(StreamError::syntax(
1532            line_num,
1533            "matrix row outside of list context",
1534        ))
1535    }
1536
1537    fn update_list_context(&mut self, type_name: &str, id: &str) {
1538        for ctx in self.state.stack.iter_mut().rev() {
1539            if let Context::List {
1540                type_name: ctx_type,
1541                last_node,
1542                count,
1543                ..
1544            } = ctx
1545            {
1546                if ctx_type == type_name {
1547                    *last_node = Some((type_name.to_string(), id.to_string()));
1548                    *count += 1;
1549                    break;
1550                }
1551            }
1552        }
1553    }
1554
1555    #[inline]
1556    fn infer_value(&self, s: &str, _line_num: usize) -> StreamResult<Value> {
1557        let s = s.trim();
1558
1559        // Handle null values: empty, ~, or the keyword "null"
1560        if s.is_empty() || s == "~" || s == "null" {
1561            return Ok(Value::Null);
1562        }
1563
1564        if s == "true" {
1565            return Ok(Value::Bool(true));
1566        }
1567        if s == "false" {
1568            return Ok(Value::Bool(false));
1569        }
1570
1571        // Reference
1572        if let Some(ref_part) = s.strip_prefix('@') {
1573            if let Some(colon_pos) = ref_part.find(':') {
1574                let type_name = &ref_part[..colon_pos];
1575                let id = &ref_part[colon_pos + 1..];
1576                return Ok(Value::Reference(hedl_core::Reference {
1577                    type_name: Some(type_name.to_string().into()),
1578                    id: id.to_string().into(),
1579                }));
1580            }
1581            return Ok(Value::Reference(hedl_core::Reference {
1582                type_name: None,
1583                id: ref_part.to_string().into(),
1584            }));
1585        }
1586
1587        // Alias
1588        if let Some(alias) = s.strip_prefix('$') {
1589            if let Some(header) = &self.header {
1590                if let Some(value) = header.aliases.get(alias) {
1591                    return Ok(Value::String(value.clone().into()));
1592                }
1593            }
1594            return Ok(Value::String(s.to_string().into()));
1595        }
1596
1597        // Number
1598        if let Ok(i) = s.parse::<i64>() {
1599            return Ok(Value::Int(i));
1600        }
1601        if let Ok(f) = s.parse::<f64>() {
1602            return Ok(Value::Float(f));
1603        }
1604
1605        // Default to string
1606        Ok(Value::String(s.to_string().into()))
1607    }
1608
1609    fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
1610        // If we already sent EndOfDocument, return None to signal true end of stream
1611        if self.sent_end_of_document {
1612            return Ok(None);
1613        }
1614
1615        // Pop remaining contexts
1616        while self.state.stack.len() > 1 {
1617            // Safe: loop condition guarantees stack has elements
1618            let ctx = self.state.stack.pop().expect("stack has elements");
1619            match ctx {
1620                Context::List {
1621                    key,
1622                    type_name,
1623                    count,
1624                    ..
1625                } => {
1626                    return Ok(Some(NodeEvent::ListEnd {
1627                        key,
1628                        type_name,
1629                        count,
1630                    }));
1631                }
1632                Context::Object { key, .. } => {
1633                    return Ok(Some(NodeEvent::ObjectEnd { key }));
1634                }
1635                Context::Root => {
1636                    // Root context should never be popped
1637                }
1638            }
1639        }
1640
1641        // Mark that we've sent EndOfDocument, so subsequent calls return None
1642        self.sent_end_of_document = true;
1643        Ok(Some(NodeEvent::EndOfDocument))
1644    }
1645}
1646
1647impl<R: Read> Iterator for StreamingParser<R> {
1648    type Item = StreamResult<NodeEvent>;
1649
1650    fn next(&mut self) -> Option<Self::Item> {
1651        match self.next_event() {
1652            Ok(Some(NodeEvent::EndOfDocument)) => None,
1653            Ok(Some(event)) => Some(Ok(event)),
1654            Ok(None) => None,
1655            Err(e) => {
1656                // Stop iteration after an error to prevent inconsistent state
1657                self.finished = true;
1658                self.errored = true;
1659                Some(Err(e))
1660            }
1661        }
1662    }
1663}
1664
1665// File opening with compression support
1666#[cfg(feature = "compression")]
1667impl StreamingParser<crate::compression::CompressionReader<std::fs::File>> {
1668    /// Open a file with automatic compression detection.
1669    ///
1670    /// Detects compression format from the file extension (`.gz`, `.zst`, `.lz4`)
1671    /// and automatically decompresses the content.
1672    ///
1673    /// # Examples
1674    ///
1675    /// ```rust,no_run
1676    /// use hedl_stream::StreamingParser;
1677    ///
1678    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
1679    /// // Open a GZIP-compressed HEDL file
1680    /// let parser = StreamingParser::open("data.hedl.gz")?;
1681    ///
1682    /// for event in parser {
1683    ///     println!("{:?}", event?);
1684    /// }
1685    /// # Ok(())
1686    /// # }
1687    /// ```
1688    ///
1689    /// # Errors
1690    ///
1691    /// - `StreamError::Io`: File not found or cannot be opened
1692    /// - `StreamError::Compression`: Decompression initialization failed
1693    /// - `StreamError::MissingVersion`: Invalid HEDL header
1694    pub fn open<P: AsRef<std::path::Path>>(path: P) -> StreamResult<Self> {
1695        Self::open_with_config(path, StreamingParserConfig::default())
1696    }
1697
1698    /// Open a file with automatic compression detection and custom configuration.
1699    ///
1700    /// Combines automatic compression detection with custom parser settings.
1701    ///
1702    /// # Examples
1703    ///
1704    /// ```rust,no_run
1705    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
1706    /// use std::time::Duration;
1707    ///
1708    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
1709    /// let config = StreamingParserConfig {
1710    ///     timeout: Some(Duration::from_secs(30)),
1711    ///     ..Default::default()
1712    /// };
1713    ///
1714    /// let parser = StreamingParser::open_with_config("data.hedl.zst", config)?;
1715    /// # Ok(())
1716    /// # }
1717    /// ```
1718    pub fn open_with_config<P: AsRef<std::path::Path>>(
1719        path: P,
1720        config: StreamingParserConfig,
1721    ) -> StreamResult<Self> {
1722        use crate::compression::{CompressionFormat, CompressionReader};
1723
1724        let path = path.as_ref();
1725        let format = CompressionFormat::from_path(path);
1726
1727        let file = std::fs::File::open(path).map_err(StreamError::Io)?;
1728        let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
1729
1730        Self::with_config(reader, config)
1731    }
1732
1733    /// Open a file with explicit compression format.
1734    ///
1735    /// Use this when the file extension doesn't match the actual compression
1736    /// format, or when you want to force a specific decompression algorithm.
1737    ///
1738    /// # Examples
1739    ///
1740    /// ```rust,no_run
1741    /// use hedl_stream::StreamingParser;
1742    /// use hedl_stream::compression::CompressionFormat;
1743    ///
1744    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
1745    /// // File has no extension but is GZIP compressed
1746    /// let parser = StreamingParser::open_with_compression(
1747    ///     "data.hedl",
1748    ///     CompressionFormat::Gzip,
1749    /// )?;
1750    /// # Ok(())
1751    /// # }
1752    /// ```
1753    pub fn open_with_compression<P: AsRef<std::path::Path>>(
1754        path: P,
1755        format: crate::compression::CompressionFormat,
1756    ) -> StreamResult<Self> {
1757        use crate::compression::CompressionReader;
1758
1759        let file = std::fs::File::open(path).map_err(StreamError::Io)?;
1760        let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
1761
1762        Self::new(reader)
1763    }
1764}
1765
1766/// SIMD-optimized comment scanning module.
1767///
1768/// This module provides AVX2-accelerated scanning for the '#' character
1769/// when detecting comment boundaries. Falls back to scalar implementation
1770/// on non-AVX2 platforms or when the feature is disabled.
1771mod simd_comment {
1772    #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1773    use std::arch::x86_64::{
1774        __m256i, _mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8, _mm256_set1_epi8,
1775    };
1776
1777    /// Find the first occurrence of '#' in the input string using SIMD.
1778    ///
1779    /// This function scans 32 bytes at a time using AVX2 instructions
1780    /// on `x86_64` platforms when the `avx2` feature is enabled.
1781    ///
1782    /// # Safety
1783    ///
1784    /// On AVX2-enabled platforms, this uses `unsafe` SIMD intrinsics
1785    /// but ensures all memory accesses are valid.
1786    #[inline]
1787    #[allow(unsafe_code)] // SIMD intrinsic requires unsafe for AVX2 operations
1788    pub fn find_hash_simd(s: &[u8]) -> Option<usize> {
1789        #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1790        {
1791            // Check if AVX2 is available at runtime
1792            if is_x86_feature_detected!("avx2") {
1793                // SAFETY: AVX2 feature is confirmed by is_x86_feature_detected!
1794                return unsafe { find_hash_avx2(s) };
1795            }
1796        }
1797
1798        // Fallback to scalar search
1799        find_hash_scalar(s)
1800    }
1801
1802    /// AVX2 implementation for finding '#' character.
1803    ///
1804    /// Scans 32 bytes at a time using SIMD instructions.
1805    #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1806    #[target_feature(enable = "avx2")]
1807    unsafe fn find_hash_avx2(s: &[u8]) -> Option<usize> {
1808        const CHUNK_SIZE: usize = 32;
1809        let len = s.len();
1810
1811        if len == 0 {
1812            return None;
1813        }
1814
1815        let hash_vec = _mm256_set1_epi8(b'#' as i8);
1816        let mut offset = 0;
1817
1818        // Process 32-byte chunks
1819        while offset + CHUNK_SIZE <= len {
1820            // Load 32 bytes from input
1821            let chunk = _mm256_loadu_si256(s.as_ptr().add(offset).cast::<__m256i>());
1822
1823            // Compare with '#' character
1824            let matches = _mm256_cmpeq_epi8(chunk, hash_vec);
1825
1826            // Convert to bitmask
1827            let mask = _mm256_movemask_epi8(matches);
1828
1829            if mask != 0 {
1830                // Found at least one match, find the first one
1831                let bit_pos = mask.trailing_zeros() as usize;
1832                return Some(offset + bit_pos);
1833            }
1834
1835            offset += CHUNK_SIZE;
1836        }
1837
1838        // Handle remaining bytes with scalar search
1839        find_hash_scalar(&s[offset..]).map(|pos| offset + pos)
1840    }
1841
1842    /// Scalar fallback implementation for finding '#' character.
1843    #[inline]
1844    fn find_hash_scalar(s: &[u8]) -> Option<usize> {
1845        s.iter().position(|&b| b == b'#')
1846    }
1847
1848    #[cfg(test)]
1849    mod tests {
1850        use super::*;
1851
1852        #[test]
1853        fn test_find_hash_basic() {
1854            assert_eq!(find_hash_simd(b"hello # world"), Some(6));
1855            assert_eq!(find_hash_simd(b"no comment"), None);
1856            assert_eq!(find_hash_simd(b"#start"), Some(0));
1857            assert_eq!(find_hash_simd(b"end#"), Some(3));
1858        }
1859
1860        #[test]
1861        fn test_find_hash_long() {
1862            // Test with string longer than SIMD chunk size
1863            let long = b"a".repeat(100);
1864            assert_eq!(find_hash_simd(&long), None);
1865
1866            let mut with_hash = b"a".repeat(50);
1867            with_hash.push(b'#');
1868            with_hash.extend_from_slice(&b"a".repeat(50));
1869            assert_eq!(find_hash_simd(&with_hash), Some(50));
1870        }
1871
1872        #[test]
1873        fn test_find_hash_edge_cases() {
1874            assert_eq!(find_hash_simd(b""), None);
1875            assert_eq!(find_hash_simd(b"#"), Some(0));
1876            assert_eq!(find_hash_simd(b"##"), Some(0));
1877        }
1878
1879        #[test]
1880        fn test_find_hash_alignment() {
1881            // Test various alignment scenarios
1882            for offset in 0..32 {
1883                let mut data = vec![b'a'; offset];
1884                data.push(b'#');
1885                data.extend_from_slice(&[b'b'; 32]);
1886                assert_eq!(find_hash_simd(&data), Some(offset));
1887            }
1888        }
1889
1890        #[test]
1891        fn test_find_hash_multiple() {
1892            assert_eq!(find_hash_simd(b"# # #"), Some(0));
1893            assert_eq!(find_hash_simd(b"a # # #"), Some(2));
1894        }
1895    }
1896}
1897
1898/// Strip inline comments from a line, respecting quoted strings and escapes.
1899///
1900/// Finds the first unquoted, unescaped '#' character and returns the string
1901/// up to that point. Uses SIMD acceleration when available.
1902///
1903/// # Examples
1904///
1905/// ```text
1906/// use hedl_stream::parser::strip_comment;
1907/// assert_eq!(strip_comment("hello # comment"), "hello");
1908/// assert_eq!(strip_comment(r#""hello # not comment""#), r#""hello # not comment""#);
1909/// assert_eq!(strip_comment("no comment"), "no comment");
1910/// ```
1911#[inline]
1912pub(crate) fn strip_comment(s: &str) -> &str {
1913    // Find # not inside quotes using SIMD-accelerated scanning
1914    let bytes = s.as_bytes();
1915    let mut in_quotes = false;
1916    let mut escape = false;
1917    let mut search_start = 0;
1918
1919    loop {
1920        // Use SIMD to find the next potential comment start
1921        let hash_pos = match simd_comment::find_hash_simd(&bytes[search_start..]) {
1922            Some(pos) => search_start + pos,
1923            None => return s, // No '#' found
1924        };
1925
1926        // Verify this '#' is not inside quotes or escaped
1927        // Scan from last search position to hash position
1928        for &c in &bytes[search_start..hash_pos] {
1929            if escape {
1930                escape = false;
1931                continue;
1932            }
1933
1934            match c {
1935                b'\\' => escape = true,
1936                b'"' => in_quotes = !in_quotes,
1937                _ => {}
1938            }
1939        }
1940
1941        // Check the '#' itself
1942        if escape {
1943            // '#' is escaped, continue searching
1944            escape = false;
1945            search_start = hash_pos + 1;
1946            continue;
1947        }
1948
1949        if !in_quotes {
1950            // Found unquoted, unescaped '#' - this is the comment start
1951            return s[..hash_pos].trim_end();
1952        }
1953
1954        // '#' is inside quotes, continue searching
1955        search_start = hash_pos + 1;
1956    }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961    use super::*;
1962    use std::io::Cursor;
1963
1964    // ============ HEADER PARSING TESTS ============
1965
1966    #[test]
1967    fn test_parse_header() {
1968        let input = r#"
1969%VERSION: 1.0
1970%STRUCT: User: [id, name, email]
1971%ALIAS active = "Active"
1972%NEST: User > Order
1973---
1974"#;
1975        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1976        let header = parser.header().unwrap();
1977
1978        assert_eq!(header.version, (1, 0));
1979        assert_eq!(
1980            header.structs.get("User"),
1981            Some(&vec![
1982                "id".to_string(),
1983                "name".to_string(),
1984                "email".to_string()
1985            ])
1986        );
1987        assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
1988        assert_eq!(header.nests.get("User"), Some(&"Order".to_string()));
1989    }
1990
1991    #[test]
1992    fn test_header_missing_version() {
1993        let input = r"
1994%STRUCT: User: [id, name]
1995---
1996";
1997        let result = StreamingParser::new(Cursor::new(input));
1998        assert!(matches!(result, Err(StreamError::MissingVersion)));
1999    }
2000
2001    #[test]
2002    fn test_header_invalid_version_format() {
2003        let input = r"
2004%VERSION abc
2005---
2006";
2007        let result = StreamingParser::new(Cursor::new(input));
2008        assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
2009    }
2010
2011    #[test]
2012    fn test_header_version_single_number() {
2013        let input = r"
2014%VERSION 1
2015---
2016";
2017        let result = StreamingParser::new(Cursor::new(input));
2018        assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
2019    }
2020
2021    #[test]
2022    fn test_header_multiple_schemas() {
2023        let input = r"
2024%VERSION: 1.0
2025%STRUCT: User: [id, name]
2026%STRUCT: Product: [id, title, price]
2027%STRUCT: Order: [id, user_id, product_id]
2028---
2029";
2030        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2031        let header = parser.header().unwrap();
2032
2033        assert_eq!(header.structs.len(), 3);
2034        assert!(header.structs.contains_key("User"));
2035        assert!(header.structs.contains_key("Product"));
2036        assert!(header.structs.contains_key("Order"));
2037    }
2038
2039    #[test]
2040    fn test_header_struct_missing_bracket() {
2041        let input = r"
2042%VERSION: 1.0
2043%STRUCT User id, name
2044---
2045";
2046        let result = StreamingParser::new(Cursor::new(input));
2047        assert!(matches!(result, Err(StreamError::Syntax { .. })));
2048    }
2049
2050    #[test]
2051    fn test_header_empty_struct() {
2052        let input = r"
2053%VERSION: 1.0
2054%STRUCT: User: []
2055---
2056";
2057        let result = StreamingParser::new(Cursor::new(input));
2058        assert!(matches!(result, Err(StreamError::Syntax { .. })));
2059    }
2060
2061    #[test]
2062    fn test_header_alias_missing_equals() {
2063        let input = r#"
2064%VERSION: 1.0
2065%ALIAS foo "bar"
2066---
2067"#;
2068        let result = StreamingParser::new(Cursor::new(input));
2069        assert!(matches!(result, Err(StreamError::Syntax { .. })));
2070    }
2071
2072    #[test]
2073    fn test_header_nest_missing_arrow() {
2074        let input = r"
2075%VERSION: 1.0
2076%NEST Parent Child
2077---
2078";
2079        let result = StreamingParser::new(Cursor::new(input));
2080        assert!(matches!(result, Err(StreamError::Syntax { .. })));
2081    }
2082
2083    #[test]
2084    fn test_header_with_comments() {
2085        let input = r"
2086%VERSION: 1.0
2087# This is a comment
2088%STRUCT: User: [id, name] # inline comment
2089---
2090";
2091        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2092        let header = parser.header().unwrap();
2093        assert!(header.structs.contains_key("User"));
2094    }
2095
2096    #[test]
2097    fn test_header_blank_lines() {
2098        let input = r"
2099%VERSION: 1.0
2100
2101%STRUCT: User: [id, name]
2102
2103---
2104";
2105        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2106        let header = parser.header().unwrap();
2107        assert!(header.structs.contains_key("User"));
2108    }
2109
2110    // ============ BASIC STREAMING TESTS ============
2111
2112    #[test]
2113    fn test_streaming_nodes() {
2114        let input = r"
2115%VERSION: 1.0
2116%STRUCT: User: [id, name]
2117---
2118users: @User
2119  | alice, Alice Smith
2120  | bob, Bob Jones
2121";
2122        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2123
2124        let events: Vec<_> = parser.collect();
2125        for event in &events {
2126            if let Err(e) = event {
2127                eprintln!("Error: {e:?}");
2128            }
2129        }
2130        assert!(events.iter().all(std::result::Result::is_ok));
2131
2132        let nodes: Vec<_> = events
2133            .iter()
2134            .filter_map(|e| e.as_ref().ok())
2135            .filter_map(|e| e.as_node())
2136            .collect();
2137
2138        assert_eq!(nodes.len(), 2);
2139        assert_eq!(nodes[0].id, "alice");
2140        assert_eq!(nodes[1].id, "bob");
2141    }
2142
2143    #[test]
2144    fn test_streaming_empty_body() {
2145        let input = r"
2146%VERSION: 1.0
2147%STRUCT: User: [id, name]
2148---
2149";
2150        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2151        let events: Vec<_> = parser.collect();
2152        assert!(events.is_empty());
2153    }
2154
2155    #[test]
2156    fn test_streaming_list_start_end_events() {
2157        let input = r"
2158%VERSION: 1.0
2159%STRUCT: User: [id, name]
2160---
2161users: @User
2162  | alice, Alice
2163";
2164        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2165        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2166
2167        let list_starts: Vec<_> = events
2168            .iter()
2169            .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
2170            .collect();
2171        let list_ends: Vec<_> = events
2172            .iter()
2173            .filter(|e| matches!(e, NodeEvent::ListEnd { .. }))
2174            .collect();
2175
2176        assert_eq!(list_starts.len(), 1);
2177        assert_eq!(list_ends.len(), 1);
2178
2179        if let NodeEvent::ListStart { key, type_name, .. } = &list_starts[0] {
2180            assert_eq!(key, "users");
2181            assert_eq!(type_name, "User");
2182        }
2183
2184        if let NodeEvent::ListEnd {
2185            type_name, count, ..
2186        } = &list_ends[0]
2187        {
2188            assert_eq!(type_name, "User");
2189            assert_eq!(*count, 1);
2190        }
2191    }
2192
2193    // ============ MATRIX ROW EDGE CASES ============
2194
2195    #[test]
2196    fn test_matrix_row_empty_fields() {
2197        // Note: Empty fields are NOT preserved in the current pipe-splitting logic
2198        // When the field is truly empty (just whitespace between pipes), it's filtered
2199        // Use ~ (tilde) for explicit null values
2200        let input = r"
2201%VERSION: 1.0
2202%STRUCT: Data: [id, optional, required]
2203---
2204data: @Data
2205  | row1, ~, value
2206";
2207        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2208        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2209        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2210
2211        assert_eq!(nodes.len(), 1);
2212        assert_eq!(nodes[0].id, "row1");
2213        assert_eq!(nodes[0].fields[1], Value::Null);
2214    }
2215
2216    #[test]
2217    fn test_matrix_row_quoted_fields() {
2218        let input = r#"
2219%VERSION: 1.0
2220%STRUCT: Data: [id, description]
2221---
2222data: @Data
2223  | row1, "Hello, World"
2224"#;
2225        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2226        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2227        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2228
2229        assert_eq!(nodes.len(), 1);
2230        assert_eq!(
2231            nodes[0].fields[1],
2232            Value::String("Hello, World".to_string().into())
2233        );
2234    }
2235
2236    #[test]
2237    fn test_matrix_row_shape_mismatch() {
2238        let input = r"
2239%VERSION: 1.0
2240%STRUCT: User: [id, name, email]
2241---
2242users: @User
2243  | alice, Alice
2244";
2245        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2246        let events: Vec<_> = parser.collect();
2247        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2248
2249        assert!(!errors.is_empty());
2250        if let Err(StreamError::ShapeMismatch { expected, got, .. }) = &errors[0] {
2251            assert_eq!(*expected, 3);
2252            assert_eq!(*got, 2);
2253        }
2254    }
2255
2256    #[test]
2257    fn test_matrix_row_references() {
2258        let input = r"
2259%VERSION: 1.0
2260%STRUCT: Order: [id, user]
2261---
2262orders: @Order
2263  | order1, @User:alice
2264  | order2, @bob
2265";
2266        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2267        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2268        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2269
2270        assert_eq!(nodes.len(), 2);
2271
2272        if let Value::Reference(r) = &nodes[0].fields[1] {
2273            assert_eq!(r.type_name.as_deref(), Some("User"));
2274            assert_eq!(&*r.id, "alice");
2275        } else {
2276            panic!("Expected reference");
2277        }
2278
2279        if let Value::Reference(r) = &nodes[1].fields[1] {
2280            assert_eq!(r.type_name, None);
2281            assert_eq!(&*r.id, "bob");
2282        } else {
2283            panic!("Expected reference");
2284        }
2285    }
2286
2287    #[test]
2288    fn test_matrix_row_booleans() {
2289        let input = r"
2290%VERSION: 1.0
2291%STRUCT: Flag: [id, active, verified]
2292---
2293flags: @Flag
2294  | flag1, true, false
2295";
2296        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2297        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2298        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2299
2300        assert_eq!(nodes.len(), 1);
2301        assert_eq!(nodes[0].fields[1], Value::Bool(true));
2302        assert_eq!(nodes[0].fields[2], Value::Bool(false));
2303    }
2304
2305    #[test]
2306    fn test_matrix_row_numbers() {
2307        let input = r"
2308%VERSION: 1.0
2309%STRUCT: Data: [id, int_val, float_val]
2310---
2311data: @Data
2312  | row1, 42, 3.5
2313  | row2, -100, -2.5
2314";
2315        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2316        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2317        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2318
2319        assert_eq!(nodes.len(), 2);
2320        assert_eq!(nodes[0].fields[1], Value::Int(42));
2321        assert_eq!(nodes[0].fields[2], Value::Float(3.5));
2322        assert_eq!(nodes[1].fields[1], Value::Int(-100));
2323        assert_eq!(nodes[1].fields[2], Value::Float(-2.5));
2324    }
2325
2326    #[test]
2327    fn test_matrix_row_null() {
2328        let input = r"
2329%VERSION: 1.0
2330%STRUCT: Data: [id, nullable]
2331---
2332data: @Data
2333  | row1, ~
2334";
2335        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2336        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2337        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2338
2339        assert_eq!(nodes.len(), 1);
2340        assert_eq!(nodes[0].fields[1], Value::Null);
2341    }
2342
2343    #[test]
2344    fn test_matrix_row_ditto() {
2345        let input = r"
2346%VERSION: 1.0
2347%STRUCT: Data: [id, category]
2348---
2349data: @Data
2350  | row1, CategoryA
2351  | row2, ^
2352  | row3, ^
2353";
2354        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2355        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2356        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2357
2358        assert_eq!(nodes.len(), 3);
2359        assert_eq!(
2360            nodes[0].fields[1],
2361            Value::String("CategoryA".to_string().into())
2362        );
2363        assert_eq!(
2364            nodes[1].fields[1],
2365            Value::String("CategoryA".to_string().into())
2366        );
2367        assert_eq!(
2368            nodes[2].fields[1],
2369            Value::String("CategoryA".to_string().into())
2370        );
2371    }
2372
2373    #[test]
2374    fn test_matrix_row_alias_substitution() {
2375        let input = r#"
2376%VERSION: 1.0
2377%ALIAS status = "Active"
2378%STRUCT: User: [id, status]
2379---
2380users: @User
2381  | alice, $status
2382"#;
2383        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2384        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2385        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2386
2387        assert_eq!(nodes.len(), 1);
2388        assert_eq!(
2389            nodes[0].fields[1],
2390            Value::String("Active".to_string().into())
2391        );
2392    }
2393
2394    // ============ INLINE SCHEMA TESTS ============
2395
2396    #[test]
2397    fn test_inline_schema() {
2398        let input = r"
2399%VERSION: 1.0
2400---
2401items: @Item[id, name]
2402  | item1, First
2403  | item2, Second
2404";
2405        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2406        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2407        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2408
2409        assert_eq!(nodes.len(), 2);
2410        assert_eq!(nodes[0].type_name, "Item");
2411    }
2412
2413    #[test]
2414    fn test_inline_schema_mismatch_declared_produces_error() {
2415        let input = r"
2416%VERSION: 1.0
2417%STRUCT: Item: [id, name, extra]
2418---
2419items: @Item[id, name]
2420  | item1, First
2421";
2422        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2423        let events: Vec<_> = parser.collect::<Vec<_>>();
2424
2425        // Should produce error when inline schema doesn't match declared schema
2426        let has_schema_error = events.iter().any(|e| {
2427            if let Err(e) = e {
2428                matches!(e, StreamError::Schema { .. })
2429            } else {
2430                false
2431            }
2432        });
2433        assert!(has_schema_error, "expected schema mismatch error");
2434    }
2435
2436    #[test]
2437    fn test_inline_schema_matches_declared_works() {
2438        let input = r"
2439%VERSION: 1.0
2440%STRUCT: Item: [id, name]
2441---
2442items: @Item[id, name]
2443  | item1, First
2444";
2445        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2446        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2447        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2448
2449        // Should work when inline schema matches declared schema
2450        assert_eq!(nodes.len(), 1);
2451        assert_eq!(nodes[0].fields.len(), 2);
2452    }
2453
2454    #[test]
2455    fn test_inline_schema_invalid_column_name() {
2456        let input = r"
2457%VERSION: 1.0
2458---
2459items: @Item[id, Invalid-Name, value]
2460  | item1, x, y
2461";
2462        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2463        let events: Vec<_> = parser.collect::<Vec<_>>();
2464
2465        // Should produce syntax error for invalid column name (hyphen not allowed)
2466        let has_syntax_error = events.iter().any(|e| {
2467            if let Err(e) = e {
2468                matches!(e, StreamError::Syntax { .. })
2469                    && format!("{e}").contains("invalid column name")
2470            } else {
2471                false
2472            }
2473        });
2474        assert!(has_syntax_error, "expected invalid column name error");
2475    }
2476
2477    #[test]
2478    fn test_inline_schema_empty() {
2479        let input = r"
2480%VERSION: 1.0
2481---
2482items: @Item[]
2483  | item1
2484";
2485        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2486        let events: Vec<_> = parser.collect::<Vec<_>>();
2487
2488        // Should produce syntax error for empty schema
2489        let has_syntax_error = events.iter().any(|e| {
2490            if let Err(e) = e {
2491                matches!(e, StreamError::Syntax { .. })
2492                    && format!("{e}").contains("empty inline schema")
2493            } else {
2494                false
2495            }
2496        });
2497        assert!(has_syntax_error, "expected empty inline schema error");
2498    }
2499
2500    #[test]
2501    fn test_inline_schema_column_with_leading_digit() {
2502        let input = r"
2503%VERSION: 1.0
2504---
2505items: @Item[id, 123col]
2506  | item1, x
2507";
2508        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2509        let events: Vec<_> = parser.collect::<Vec<_>>();
2510
2511        // Should produce error for column name starting with digit
2512        let has_syntax_error = events.iter().any(|e| {
2513            if let Err(e) = e {
2514                matches!(e, StreamError::Syntax { .. })
2515                    && format!("{e}").contains("invalid column name")
2516            } else {
2517                false
2518            }
2519        });
2520        assert!(
2521            has_syntax_error,
2522            "expected invalid column name error for leading digit"
2523        );
2524    }
2525
2526    // ============ OBJECT CONTEXT TESTS ============
2527
2528    #[test]
2529    fn test_object_context() {
2530        let input = r"
2531%VERSION: 1.0
2532%STRUCT: User: [id, name]
2533---
2534db:
2535  users: @User
2536    | alice, Alice
2537";
2538        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2539        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2540
2541        let obj_starts: Vec<_> = events
2542            .iter()
2543            .filter(|e| matches!(e, NodeEvent::ObjectStart { .. }))
2544            .collect();
2545        assert_eq!(obj_starts.len(), 1);
2546
2547        if let NodeEvent::ObjectStart { key, .. } = obj_starts[0] {
2548            assert_eq!(key, "db");
2549        }
2550    }
2551
2552    #[test]
2553    fn test_scalar_value() {
2554        let input = r#"
2555%VERSION: 1.0
2556---
2557config:
2558  timeout: 30
2559  enabled: true
2560  name: "Test Config"
2561"#;
2562        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2563        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2564
2565        let scalars: Vec<_> = events
2566            .iter()
2567            .filter(|e| matches!(e, NodeEvent::Scalar { .. }))
2568            .collect();
2569        assert_eq!(scalars.len(), 3);
2570    }
2571
2572    // ============ UNICODE TESTS ============
2573
2574    #[test]
2575    fn test_unicode_ids() {
2576        let input = r"
2577%VERSION: 1.0
2578%STRUCT: User: [id, name]
2579---
2580users: @User
2581  | 用户1, 张三
2582  | пользователь, Иван
2583";
2584        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2585        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2586        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2587
2588        assert_eq!(nodes.len(), 2);
2589        assert_eq!(nodes[0].id, "用户1");
2590        assert_eq!(nodes[1].id, "пользователь");
2591    }
2592
2593    #[test]
2594    fn test_unicode_in_values() {
2595        let input = r"
2596%VERSION: 1.0
2597%STRUCT: Data: [id, emoji]
2598---
2599data: @Data
2600  | row1, 🎉✨🚀
2601";
2602        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2603        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2604        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2605
2606        assert_eq!(nodes.len(), 1);
2607        assert_eq!(
2608            nodes[0].fields[1],
2609            Value::String("🎉✨🚀".to_string().into())
2610        );
2611    }
2612
2613    // ============ COMMENT HANDLING TESTS ============
2614
2615    #[test]
2616    fn test_inline_comments() {
2617        let input = r"
2618%VERSION: 1.0
2619%STRUCT: User: [id, name]
2620---
2621users: @User  # list of users
2622  | alice, Alice Smith  # first user
2623  | bob, Bob Jones
2624";
2625        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2626        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2627        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2628
2629        assert_eq!(nodes.len(), 2);
2630        // Comments should be stripped
2631        assert_eq!(nodes[0].id, "alice");
2632    }
2633
2634    #[test]
2635    fn test_full_line_comments() {
2636        let input = r"
2637%VERSION: 1.0
2638%STRUCT: User: [id, name]
2639---
2640# This is a comment
2641users: @User
2642  # Comment between rows
2643  | alice, Alice
2644  | bob, Bob
2645";
2646        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2647        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2648        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2649
2650        assert_eq!(nodes.len(), 2);
2651    }
2652
2653    #[test]
2654    fn test_hash_in_quoted_string() {
2655        let input =
2656            "%VERSION: 1.0\n%STRUCT: Data: [id, tag]\n---\ndata: @Data\n  | row1, \"#hashtag\"\n";
2657        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2658        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2659        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2660
2661        assert_eq!(nodes.len(), 1);
2662        assert_eq!(
2663            nodes[0].fields[1],
2664            Value::String("#hashtag".to_string().into())
2665        );
2666    }
2667
2668    // ============ INDENT AND CONTEXT TESTS ============
2669
2670    #[test]
2671    fn test_multiple_lists() {
2672        let input = r"
2673%VERSION: 1.0
2674%STRUCT: User: [id, name]
2675%STRUCT: Product: [id, title]
2676---
2677users: @User
2678  | alice, Alice
2679products: @Product
2680  | prod1, Widget
2681";
2682        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2683        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2684        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2685
2686        assert_eq!(nodes.len(), 2);
2687        assert_eq!(nodes[0].type_name, "User");
2688        assert_eq!(nodes[1].type_name, "Product");
2689    }
2690
2691    #[test]
2692    fn test_excessive_indent_error() {
2693        let config = StreamingParserConfig {
2694            max_indent_depth: 2,
2695            ..Default::default()
2696        };
2697        let input = r"
2698%VERSION: 1.0
2699%STRUCT: Data: [id]
2700---
2701level1:
2702  level2:
2703    level3:
2704      data: @Data
2705        | row1
2706";
2707        let parser = StreamingParser::with_config(Cursor::new(input), config).unwrap();
2708
2709        // Should get an error for excessive indent
2710        let mut found_indent_error = false;
2711        for result in parser {
2712            if let Err(StreamError::Syntax { message, .. }) = result {
2713                if message.contains("indent depth") {
2714                    found_indent_error = true;
2715                    break;
2716                }
2717            }
2718        }
2719        assert!(found_indent_error);
2720    }
2721
2722    // ============ ERROR HANDLING TESTS ============
2723
2724    #[test]
2725    fn test_undefined_schema() {
2726        let input = r"
2727%VERSION: 1.0
2728---
2729users: @User
2730  | alice, Alice
2731";
2732        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2733        let events: Vec<_> = parser.collect();
2734        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2735
2736        // Should get an error because User schema is not defined
2737        assert!(!errors.is_empty());
2738    }
2739
2740    #[test]
2741    fn test_orphan_row_without_context() {
2742        let input = r"
2743%VERSION: 1.0
2744%STRUCT: Data: [id]
2745---
2746| orphan_row
2747";
2748        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2749        let events: Vec<_> = parser.collect();
2750        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2751
2752        assert!(!errors.is_empty());
2753    }
2754
2755    #[test]
2756    fn test_missing_colon_error() {
2757        let input = r"
2758%VERSION: 1.0
2759---
2760invalid line without colon
2761";
2762        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2763        let events: Vec<_> = parser.collect();
2764        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2765
2766        assert!(!errors.is_empty());
2767        if let Err(StreamError::Syntax { message, .. }) = &errors[0] {
2768            assert!(message.contains(':'));
2769        }
2770    }
2771
2772    // ============ LARGE FILE SIMULATION ============
2773
2774    #[test]
2775    fn test_many_rows() {
2776        let mut input = String::from(
2777            r"
2778%VERSION: 1.0
2779%STRUCT: Data: [id, value]
2780---
2781data: @Data
2782",
2783        );
2784        for i in 0..1000 {
2785            input.push_str(&format!("  | row{i}, value{i}\n"));
2786        }
2787
2788        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2789        let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2790        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2791
2792        assert_eq!(nodes.len(), 1000);
2793    }
2794
2795    // ============ STRIP COMMENT HELPER TESTS ============
2796
2797    #[test]
2798    fn test_strip_comment_basic() {
2799        assert_eq!(strip_comment("hello # comment"), "hello");
2800    }
2801
2802    #[test]
2803    fn test_strip_comment_quoted() {
2804        assert_eq!(
2805            strip_comment(r#""hello # not comment""#),
2806            r#""hello # not comment""#
2807        );
2808    }
2809
2810    #[test]
2811    fn test_strip_comment_escaped() {
2812        // Backslash escapes the hash, so \# is not treated as comment start
2813        assert_eq!(
2814            strip_comment(r"hello\# not a comment"),
2815            r"hello\# not a comment"
2816        );
2817        // But a later unescaped hash still starts a comment
2818        assert_eq!(
2819            strip_comment(r"hello\# still here # comment"),
2820            r"hello\# still here"
2821        );
2822    }
2823
2824    #[test]
2825    fn test_strip_comment_escaped_in_quotes() {
2826        // Inside quotes, backslash-hash is preserved
2827        assert_eq!(
2828            strip_comment(r#""hello\#world" more"#),
2829            r#""hello\#world" more"#
2830        );
2831    }
2832
2833    #[test]
2834    fn test_strip_comment_no_comment() {
2835        assert_eq!(strip_comment("hello world"), "hello world");
2836    }
2837}