hedl_stream/
parser.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Streaming parser implementation.
19//!
20//! This module provides the core streaming parser for HEDL documents. The parser
21//! processes HEDL files incrementally, yielding events as they are encountered,
22//! making it suitable for processing files that are too large to fit in memory.
23//!
24//! # Design Philosophy
25//!
26//! - **Memory Efficiency**: Only the current line and parsing state are kept in memory
27//! - **Iterator-Based**: Standard Rust iterator interface for easy composition
28//! - **Error Recovery**: Clear error messages with line numbers for debugging
29//! - **Safety**: Built-in timeout protection against malicious/untrusted input
30//!
31//! # Basic Usage
32//!
33//! ```rust
34//! use hedl_stream::{StreamingParser, NodeEvent};
35//! use std::io::Cursor;
36//!
37//! let input = r#"
38//! %VERSION: 1.0
39//! %STRUCT: User: [id, name, email]
40//! ---
41//! users: @User
42//!   | alice, Alice Smith, alice@example.com
43//!   | bob, Bob Jones, bob@example.com
44//! "#;
45//!
46//! let parser = StreamingParser::new(Cursor::new(input)).unwrap();
47//!
48//! for event in parser {
49//!     match event {
50//!         Ok(NodeEvent::Node(node)) => {
51//!             println!("Found {}: {}", node.type_name, node.id);
52//!         }
53//!         Err(e) => eprintln!("Parse error: {}", e),
54//!         _ => {}
55//!     }
56//! }
57//! ```
58
59use crate::error::{StreamError, StreamResult};
60use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
61use crate::reader::LineReader;
62use hedl_core::Value;
63use hedl_core::lex::{calculate_indent, is_valid_key_token, is_valid_type_name};
64use std::io::Read;
65use std::time::{Duration, Instant};
66
67/// Type alias for list context lookup result: (type_name, schema, optional last_node info)
68type ListContextResult = (String, Vec<String>, Option<(String, String)>);
69
70/// Configuration options for the streaming parser.
71///
72/// Controls memory limits, buffer sizes, and timeout behavior.
73///
74/// # Examples
75///
76/// ## Default Configuration
77///
78/// ```rust
79/// use hedl_stream::StreamingParserConfig;
80///
81/// let config = StreamingParserConfig::default();
82/// assert_eq!(config.max_line_length, 1_000_000);
83/// assert_eq!(config.max_indent_depth, 100);
84/// assert_eq!(config.buffer_size, 64 * 1024);
85/// assert_eq!(config.timeout, None);
86/// ```
87///
88/// ## Custom Configuration for Large Files
89///
90/// ```rust
91/// use hedl_stream::StreamingParserConfig;
92///
93/// let config = StreamingParserConfig {
94///     max_line_length: 10_000_000,  // 10MB lines
95///     max_indent_depth: 1000,        // Deep nesting
96///     buffer_size: 256 * 1024,       // 256KB buffer
97///     timeout: None,                 // No timeout
98/// };
99/// ```
100///
101/// ## Configuration for Untrusted Input
102///
103/// ```rust
104/// use hedl_stream::StreamingParserConfig;
105/// use std::time::Duration;
106///
107/// let config = StreamingParserConfig {
108///     max_line_length: 100_000,           // Limit line length
109///     max_indent_depth: 50,                // Limit nesting
110///     buffer_size: 32 * 1024,              // Smaller buffer
111///     timeout: Some(Duration::from_secs(10)), // 10 second timeout
112/// };
113/// ```
114#[derive(Debug, Clone)]
115pub struct StreamingParserConfig {
116    /// Maximum line length in bytes.
117    ///
118    /// Lines exceeding this length will cause a parsing error. This protects against
119    /// malformed input with extremely long lines that could exhaust memory.
120    ///
121    /// Default: 1,000,000 bytes (1MB)
122    pub max_line_length: usize,
123
124    /// Maximum indentation depth.
125    ///
126    /// Indentation levels exceeding this depth will cause a parsing error. This
127    /// protects against deeply nested structures that could cause stack overflow
128    /// or performance issues.
129    ///
130    /// Default: 100 levels
131    pub max_indent_depth: usize,
132
133    /// Buffer size for reading input.
134    ///
135    /// Larger buffers can improve performance for large files by reducing the
136    /// number of system calls, but use more memory.
137    ///
138    /// Default: 64KB
139    pub buffer_size: usize,
140
141    /// Timeout for parsing operations.
142    ///
143    /// If set, the parser will return a `StreamError::Timeout` if parsing takes
144    /// longer than the specified duration. This protects against infinite loops
145    /// from malicious or malformed input.
146    ///
147    /// Set to `None` to disable timeout checking (default for trusted input).
148    ///
149    /// Default: None (no timeout)
150    ///
151    /// # Performance Note
152    ///
153    /// Timeout checking is performed periodically (every 100 operations) to minimize
154    /// overhead. For very fast parsing, the actual timeout may slightly exceed the
155    /// configured limit.
156    pub timeout: Option<Duration>,
157}
158
159impl Default for StreamingParserConfig {
160    fn default() -> Self {
161        Self {
162            max_line_length: 1_000_000,
163            max_indent_depth: 100,
164            buffer_size: 64 * 1024,
165            timeout: None, // No timeout by default for backward compatibility
166        }
167    }
168}
169
170/// Streaming HEDL parser.
171///
172/// Processes HEDL documents incrementally, yielding `NodeEvent` items as they
173/// are parsed without loading the entire document into memory. This makes it
174/// suitable for processing multi-gigabyte files on systems with limited RAM.
175///
176/// # Memory Characteristics
177///
178/// - **Header**: Parsed once at initialization and kept in memory
179/// - **Per-Line**: Only current line and parsing context (stack depth proportional to nesting)
180/// - **No Buffering**: Nodes are yielded immediately after parsing
181///
182/// # When to Use
183///
184/// - **Large Files**: Files too large to fit comfortably in memory
185/// - **Streaming Workflows**: Processing data as it arrives (pipes, network streams)
186/// - **Memory-Constrained**: Embedded systems or containers with memory limits
187/// - **ETL Pipelines**: Extract-transform-load workflows with HEDL data
188///
189/// # Iterator Interface
190///
191/// `StreamingParser` implements `Iterator<Item = StreamResult<NodeEvent>>`, allowing
192/// use with standard iterator methods like `filter`, `map`, `collect`, etc.
193///
194/// # Examples
195///
196/// ## Basic Streaming Parse
197///
198/// ```rust
199/// use hedl_stream::{StreamingParser, NodeEvent};
200/// use std::fs::File;
201/// use std::io::BufReader;
202///
203/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
204/// # use std::io::Cursor;
205/// # let file = Cursor::new(r#"
206/// # %VERSION: 1.0
207/// # %STRUCT: User: [id, name]
208/// # ---
209/// # users: @User
210/// #   | alice, Alice
211/// # "#);
212/// let reader = BufReader::new(file);
213/// let parser = StreamingParser::new(reader)?;
214///
215/// for event in parser {
216///     match event? {
217///         NodeEvent::Node(node) => {
218///             println!("Processing {}: {}", node.type_name, node.id);
219///             // Process node immediately, no buffering
220///         }
221///         NodeEvent::ListStart { type_name, .. } => {
222///             println!("Starting list of {}", type_name);
223///         }
224///         NodeEvent::ListEnd { count, .. } => {
225///             println!("Finished list with {} items", count);
226///         }
227///         _ => {}
228///     }
229/// }
230/// # Ok(())
231/// # }
232/// ```
233///
234/// ## Filtering During Parse
235///
236/// ```rust
237/// use hedl_stream::{StreamingParser, NodeEvent};
238/// use std::io::Cursor;
239///
240/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
241/// let input = r#"
242/// %VERSION: 1.0
243/// %STRUCT: User: [id, name, active]
244/// ---
245/// users: @User
246///   | alice, Alice, true
247///   | bob, Bob, false
248///   | carol, Carol, true
249/// "#;
250///
251/// let parser = StreamingParser::new(Cursor::new(input))?;
252///
253/// // Only collect active users
254/// let active_users: Vec<_> = parser
255///     .filter_map(|event| event.ok())
256///     .filter_map(|event| {
257///         if let NodeEvent::Node(node) = event {
258///             Some(node)
259///         } else {
260///             None
261///         }
262///     })
263///     .filter(|node| {
264///         // Check if 'active' field (index 2) is true
265///         matches!(node.get_field(2), Some(hedl_core::Value::Bool(true)))
266///     })
267///     .collect();
268///
269/// assert_eq!(active_users.len(), 2); // alice and carol
270/// # Ok(())
271/// # }
272/// ```
273///
274/// ## With Timeout for Untrusted Input
275///
276/// ```rust
277/// use hedl_stream::{StreamingParser, StreamingParserConfig, StreamError};
278/// use std::time::Duration;
279/// use std::io::Cursor;
280///
281/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
282/// let untrusted_input = "..."; // Input from external source
283///
284/// let config = StreamingParserConfig {
285///     timeout: Some(Duration::from_secs(5)),
286///     ..Default::default()
287/// };
288///
289/// let parser = StreamingParser::with_config(
290///     Cursor::new(untrusted_input),
291///     config
292/// )?;
293///
294/// for event in parser {
295///     match event {
296///         Ok(event) => {
297///             // Process event
298///         }
299///         Err(StreamError::Timeout { elapsed, limit }) => {
300///             eprintln!("Parsing timed out after {:?}", elapsed);
301///             break;
302///         }
303///         Err(e) => {
304///             eprintln!("Parse error: {}", e);
305///             break;
306///         }
307///     }
308/// }
309/// # Ok(())
310/// # }
311/// ```
312///
313/// ## Processing Nested Structures
314///
315/// ```rust
316/// use hedl_stream::{StreamingParser, NodeEvent};
317/// use std::io::Cursor;
318///
319/// # fn example() -> Result<(), Box<dyn std::error::Error>> {
320/// let input = r#"
321/// %VERSION: 1.0
322/// %STRUCT: User: [id, name]
323/// %STRUCT: Order: [id, amount]
324/// %NEST: User > Order
325/// ---
326/// users: @User
327///   | alice, Alice
328///     | order1, 100.00
329///     | order2, 50.00
330///   | bob, Bob
331///     | order3, 75.00
332/// "#;
333///
334/// let parser = StreamingParser::new(Cursor::new(input))?;
335///
336/// for event in parser.filter_map(|e| e.ok()) {
337///     if let NodeEvent::Node(node) = event {
338///         if node.is_nested() {
339///             println!("  Child: {} belongs to {:?}",
340///                 node.id, node.parent_id);
341///         } else {
342///             println!("Parent: {}", node.id);
343///         }
344///     }
345/// }
346/// # Ok(())
347/// # }
348/// ```
349///
350/// # Error Handling
351///
352/// Parsing errors include line numbers for easy debugging:
353///
354/// ```rust
355/// use hedl_stream::{StreamingParser, StreamError};
356/// use std::io::Cursor;
357///
358/// let bad_input = r#"
359/// %VERSION: 1.0
360/// ---
361/// invalid line without colon
362/// "#;
363///
364/// let parser = StreamingParser::new(Cursor::new(bad_input)).unwrap();
365///
366/// for event in parser {
367///     if let Err(e) = event {
368///         if let Some(line) = e.line() {
369///             eprintln!("Error at line {}: {}", line, e);
370///         }
371///     }
372/// }
373/// ```
374pub struct StreamingParser<R: Read> {
375    reader: LineReader<R>,
376    config: StreamingParserConfig,
377    header: Option<HeaderInfo>,
378    state: ParserState,
379    finished: bool,
380    start_time: Instant,
381    operations_count: usize, // Track operations for periodic timeout checks
382}
383
384#[derive(Debug)]
385struct ParserState {
386    /// Stack of active contexts.
387    stack: Vec<Context>,
388    /// Previous row values for ditto handling.
389    prev_row: Option<Vec<Value>>,
390}
391
392#[derive(Debug, Clone)]
393enum Context {
394    Root,
395    Object {
396        #[allow(dead_code)]
397        key: String,
398        indent: usize,
399    },
400    List {
401        key: String,
402        type_name: String,
403        schema: Vec<String>,
404        row_indent: usize,
405        count: usize,
406        last_node: Option<(String, String)>, // (type, id)
407    },
408}
409
410impl<R: Read> StreamingParser<R> {
411    /// Create a new streaming parser with default configuration.
412    ///
413    /// The parser immediately reads and validates the HEDL header (version and
414    /// schema directives). If the header is invalid, this function returns an error.
415    ///
416    /// # Parameters
417    ///
418    /// - `reader`: Any type implementing `Read` (files, network streams, buffers, etc.)
419    ///
420    /// # Returns
421    ///
422    /// - `Ok(parser)`: Parser ready to yield events
423    /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
424    ///
425    /// # Examples
426    ///
427    /// ## From a File
428    ///
429    /// ```rust,no_run
430    /// use hedl_stream::StreamingParser;
431    /// use std::fs::File;
432    /// use std::io::BufReader;
433    ///
434    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
435    /// let file = File::open("data.hedl")?;
436    /// let reader = BufReader::new(file);
437    /// let parser = StreamingParser::new(reader)?;
438    /// # Ok(())
439    /// # }
440    /// ```
441    ///
442    /// ## From a String
443    ///
444    /// ```rust
445    /// use hedl_stream::StreamingParser;
446    /// use std::io::Cursor;
447    ///
448    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
449    /// let data = r#"
450    /// %VERSION: 1.0
451    /// %STRUCT: User: [id, name]
452    /// ---
453    /// users: @User
454    ///   | alice, Alice
455    /// "#;
456    ///
457    /// let parser = StreamingParser::new(Cursor::new(data))?;
458    /// # Ok(())
459    /// # }
460    /// ```
461    ///
462    /// ## From Stdin
463    ///
464    /// ```rust,no_run
465    /// use hedl_stream::StreamingParser;
466    /// use std::io::stdin;
467    ///
468    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
469    /// let parser = StreamingParser::new(stdin().lock())?;
470    /// # Ok(())
471    /// # }
472    /// ```
473    ///
474    /// # Errors
475    ///
476    /// - `StreamError::MissingVersion`: No `%VERSION` directive found
477    /// - `StreamError::InvalidVersion`: Invalid version format
478    /// - `StreamError::Syntax`: Malformed header directive
479    /// - `StreamError::Io`: I/O error reading input
480    pub fn new(reader: R) -> StreamResult<Self> {
481        Self::with_config(reader, StreamingParserConfig::default())
482    }
483
484    /// Create a streaming parser with custom configuration.
485    ///
486    /// Use this when you need to control memory limits, buffer sizes, or enable
487    /// timeout protection for untrusted input.
488    ///
489    /// # Parameters
490    ///
491    /// - `reader`: Any type implementing `Read`
492    /// - `config`: Parser configuration options
493    ///
494    /// # Returns
495    ///
496    /// - `Ok(parser)`: Parser ready to yield events
497    /// - `Err(e)`: Configuration invalid or header parsing failed
498    ///
499    /// # Examples
500    ///
501    /// ## With Timeout Protection
502    ///
503    /// ```rust
504    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
505    /// use std::time::Duration;
506    /// use std::io::Cursor;
507    ///
508    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
509    /// let config = StreamingParserConfig {
510    ///     timeout: Some(Duration::from_secs(30)),
511    ///     ..Default::default()
512    /// };
513    ///
514    /// let untrusted_input = "...";
515    /// let parser = StreamingParser::with_config(
516    ///     Cursor::new(untrusted_input),
517    ///     config
518    /// )?;
519    /// # Ok(())
520    /// # }
521    /// ```
522    ///
523    /// ## For Large Files
524    ///
525    /// ```rust
526    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
527    /// use std::io::Cursor;
528    ///
529    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
530    /// let config = StreamingParserConfig {
531    ///     buffer_size: 256 * 1024,      // 256KB read buffer
532    ///     max_line_length: 10_000_000,  // 10MB max line
533    ///     max_indent_depth: 1000,       // Deep nesting allowed
534    ///     timeout: None,
535    /// };
536    ///
537    /// let parser = StreamingParser::with_config(
538    ///     Cursor::new("..."),
539    ///     config
540    /// )?;
541    /// # Ok(())
542    /// # }
543    /// ```
544    ///
545    /// ## For Constrained Environments
546    ///
547    /// ```rust
548    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
549    /// use std::time::Duration;
550    /// use std::io::Cursor;
551    ///
552    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
553    /// let config = StreamingParserConfig {
554    ///     buffer_size: 8 * 1024,        // Small 8KB buffer
555    ///     max_line_length: 100_000,     // 100KB max line
556    ///     max_indent_depth: 50,         // Limited nesting
557    ///     timeout: Some(Duration::from_secs(10)),
558    /// };
559    ///
560    /// let parser = StreamingParser::with_config(
561    ///     Cursor::new("..."),
562    ///     config
563    /// )?;
564    /// # Ok(())
565    /// # }
566    /// ```
567    ///
568    /// # Errors
569    ///
570    /// Same as [`new()`](Self::new), plus:
571    ///
572    /// - `StreamError::Timeout`: Header parsing exceeded configured timeout
573    pub fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
574        let mut parser = Self {
575            reader: LineReader::with_capacity(reader, config.buffer_size),
576            config,
577            header: None,
578            state: ParserState {
579                stack: vec![Context::Root],
580                prev_row: None,
581            },
582            finished: false,
583            start_time: Instant::now(),
584            operations_count: 0,
585        };
586
587        // Parse header immediately
588        parser.parse_header()?;
589
590        Ok(parser)
591    }
592
593    /// Check if timeout has been exceeded.
594    /// This is called periodically during parsing to prevent infinite loops.
595    #[inline]
596    fn check_timeout(&self) -> StreamResult<()> {
597        if let Some(timeout) = self.config.timeout {
598            let elapsed = self.start_time.elapsed();
599            if elapsed > timeout {
600                return Err(StreamError::Timeout { elapsed, limit: timeout });
601            }
602        }
603        Ok(())
604    }
605
606    /// Get the parsed header information.
607    ///
608    /// Returns header metadata including version, schema definitions, aliases,
609    /// and nesting rules. This is available immediately after parser creation.
610    ///
611    /// # Returns
612    ///
613    /// - `Some(&HeaderInfo)`: Header was successfully parsed
614    /// - `None`: Should never happen after successful parser creation
615    ///
616    /// # Examples
617    ///
618    /// ## Inspecting Schema Definitions
619    ///
620    /// ```rust
621    /// use hedl_stream::StreamingParser;
622    /// use std::io::Cursor;
623    ///
624    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
625    /// let input = r#"
626    /// %VERSION: 1.0
627    /// %STRUCT: User: [id, name, email]
628    /// %STRUCT: Order: [id, user_id, amount]
629    /// %ALIAS: active = "Active"
630    /// %NEST: User > Order
631    /// ---
632    /// "#;
633    ///
634    /// let parser = StreamingParser::new(Cursor::new(input))?;
635    /// let header = parser.header().unwrap();
636    ///
637    /// // Check version
638    /// assert_eq!(header.version, (1, 0));
639    ///
640    /// // Get schema
641    /// let user_schema = header.get_schema("User").unwrap();
642    /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
643    ///
644    /// // Check aliases
645    /// assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
646    ///
647    /// // Check nesting rules
648    /// assert_eq!(header.get_child_type("User"), Some(&"Order".to_string()));
649    /// # Ok(())
650    /// # }
651    /// ```
652    ///
653    /// ## Validating Before Processing
654    ///
655    /// ```rust
656    /// use hedl_stream::StreamingParser;
657    /// use std::io::Cursor;
658    ///
659    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
660    /// let input = r#"
661    /// %VERSION: 1.0
662    /// %STRUCT: User: [id, name]
663    /// ---
664    /// users: @User
665    ///   | alice, Alice
666    /// "#;
667    ///
668    /// let parser = StreamingParser::new(Cursor::new(input))?;
669    ///
670    /// // Validate we have the expected schema before processing
671    /// if let Some(header) = parser.header() {
672    ///     if header.version.0 != 1 {
673    ///         eprintln!("Warning: Unexpected major version");
674    ///     }
675    ///
676    ///     if !header.structs.contains_key("User") {
677    ///         return Err("Missing User schema".into());
678    ///     }
679    /// }
680    ///
681    /// // Proceed with parsing...
682    /// # Ok(())
683    /// # }
684    /// ```
685    pub fn header(&self) -> Option<&HeaderInfo> {
686        self.header.as_ref()
687    }
688
689    /// Parse the header section.
690    fn parse_header(&mut self) -> StreamResult<()> {
691        let mut header = HeaderInfo::new();
692        let mut found_version = false;
693        let mut _found_separator = false;
694
695        while let Some((line_num, line)) = self.reader.next_line()? {
696            // Check timeout every iteration in header parsing
697            self.check_timeout()?;
698
699            let trimmed = line.trim();
700
701            // Skip blank lines and comments
702            if trimmed.is_empty() || trimmed.starts_with('#') {
703                continue;
704            }
705
706            // Check for separator
707            if trimmed == "---" {
708                _found_separator = true;
709                break;
710            }
711
712            // Parse directives
713            if trimmed.starts_with('%') {
714                self.parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
715            } else {
716                // Not a directive - might be body content without separator
717                self.reader.push_back(line_num, line);
718                break;
719            }
720        }
721
722        if !found_version {
723            return Err(StreamError::MissingVersion);
724        }
725
726        self.header = Some(header);
727        Ok(())
728    }
729
730    fn parse_directive(
731        &self,
732        line: &str,
733        line_num: usize,
734        header: &mut HeaderInfo,
735        found_version: &mut bool,
736    ) -> StreamResult<()> {
737        if line.starts_with("%VERSION") {
738            self.parse_version_directive(line, header, found_version)
739        } else if line.starts_with("%STRUCT") {
740            self.parse_struct_directive(line, line_num, header)
741        } else if line.starts_with("%ALIAS") {
742            self.parse_alias_directive(line, line_num, header)
743        } else if line.starts_with("%NEST") {
744            self.parse_nest_directive(line, line_num, header)
745        } else {
746            Ok(())
747        }
748    }
749
750    fn parse_version_directive(
751        &self,
752        line: &str,
753        header: &mut HeaderInfo,
754        found_version: &mut bool,
755    ) -> StreamResult<()> {
756        // Safe: starts_with check guarantees prefix exists
757        let rest = line.strip_prefix("%VERSION").expect("prefix exists").trim();
758        // Handle both "%VERSION: 1.0" and "%VERSION: 1.0" formats
759        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
760        let parts: Vec<&str> = rest.split('.').collect();
761
762        if parts.len() != 2 {
763            return Err(StreamError::InvalidVersion(rest.to_string()));
764        }
765
766        let major: u32 = parts[0]
767            .parse()
768            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
769        let minor: u32 = parts[1]
770            .parse()
771            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
772
773        header.version = (major, minor);
774        *found_version = true;
775        Ok(())
776    }
777
778    fn parse_struct_directive(
779        &self,
780        line: &str,
781        line_num: usize,
782        header: &mut HeaderInfo,
783    ) -> StreamResult<()> {
784        // Safe: starts_with check guarantees prefix exists
785        let rest = line.strip_prefix("%STRUCT").expect("prefix exists").trim();
786        // Handle both "%STRUCT TypeName: [cols]" and "%STRUCT: TypeName: [cols]" formats
787        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
788
789        let bracket_start = rest
790            .find('[')
791            .ok_or_else(|| StreamError::syntax(line_num, "missing '[' in %STRUCT"))?;
792        let bracket_end = rest
793            .find(']')
794            .ok_or_else(|| StreamError::syntax(line_num, "missing ']' in %STRUCT"))?;
795
796        // Type name may have trailing colon and optional count, strip them
797        // Format: TypeName: or TypeName (N):
798        let type_part = rest[..bracket_start].trim().trim_end_matches(':').trim();
799        // Handle optional count: "TypeName (N)" -> extract just "TypeName"
800        let type_name = if let Some(paren_pos) = type_part.find('(') {
801            type_part[..paren_pos].trim()
802        } else {
803            type_part
804        };
805        if !is_valid_type_name(type_name) {
806            return Err(StreamError::syntax(
807                line_num,
808                format!("invalid type name: {}", type_name),
809            ));
810        }
811
812        let cols_str = &rest[bracket_start + 1..bracket_end];
813        let columns: Vec<String> = cols_str
814            .split(',')
815            .map(|s| s.trim().to_string())
816            .filter(|s| !s.is_empty())
817            .collect();
818
819        if columns.is_empty() {
820            return Err(StreamError::syntax(line_num, "empty schema"));
821        }
822
823        header.structs.insert(type_name.to_string(), columns);
824        Ok(())
825    }
826
827    fn parse_alias_directive(
828        &self,
829        line: &str,
830        line_num: usize,
831        header: &mut HeaderInfo,
832    ) -> StreamResult<()> {
833        // Safe: starts_with check guarantees prefix exists
834        let rest = line.strip_prefix("%ALIAS").expect("prefix exists").trim();
835        // Handle both "%ALIAS: %short: = ..." and "%ALIAS: %short: ..." formats
836        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
837
838        // Support both '=' and ':' as separators
839        let sep_pos = rest
840            .find('=')
841            .or_else(|| rest.find(':'))
842            .ok_or_else(|| StreamError::syntax(line_num, "missing '=' or ':' in %ALIAS"))?;
843
844        let alias = rest[..sep_pos].trim();
845        let value = rest[sep_pos + 1..].trim().trim_matches('"');
846
847        header.aliases.insert(alias.to_string(), value.to_string());
848        Ok(())
849    }
850
851    fn parse_nest_directive(
852        &self,
853        line: &str,
854        line_num: usize,
855        header: &mut HeaderInfo,
856    ) -> StreamResult<()> {
857        // Safe: starts_with check guarantees prefix exists
858        let rest = line.strip_prefix("%NEST").expect("prefix exists").trim();
859        // Handle both "%NEST: Parent > Child" and "%NEST: Parent > Child" formats
860        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
861
862        let arrow_pos = rest
863            .find('>')
864            .ok_or_else(|| StreamError::syntax(line_num, "missing '>' in %NEST"))?;
865
866        let parent = rest[..arrow_pos].trim();
867        let child = rest[arrow_pos + 1..].trim();
868
869        if !is_valid_type_name(parent) || !is_valid_type_name(child) {
870            return Err(StreamError::syntax(line_num, "invalid type name in %NEST"));
871        }
872
873        header.nests.insert(parent.to_string(), child.to_string());
874        Ok(())
875    }
876
877    /// Parse the next event from the stream.
878    fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
879        if self.finished {
880            return Ok(None);
881        }
882
883        loop {
884            // Check timeout periodically (every 100 operations to minimize overhead)
885            self.operations_count += 1;
886            if self.operations_count.is_multiple_of(100) {
887                self.check_timeout()?;
888            }
889
890            let (line_num, line) = match self.reader.next_line()? {
891                Some(l) => l,
892                None => {
893                    self.finished = true;
894                    // Emit any remaining list ends
895                    return self.finalize();
896                }
897            };
898
899            let trimmed = line.trim();
900
901            // Skip blank lines and comments
902            if trimmed.is_empty() || trimmed.starts_with('#') {
903                continue;
904            }
905
906            // Calculate indentation
907            let indent_info = calculate_indent(&line, line_num as u32)
908                .map_err(|e| StreamError::syntax(line_num, e.to_string()))?;
909
910            let (indent, content) = match indent_info {
911                Some(info) => (info.level, &line[info.spaces..]),
912                None => continue,
913            };
914
915            if indent > self.config.max_indent_depth {
916                return Err(StreamError::syntax(
917                    line_num,
918                    format!("indent depth {} exceeds limit", indent),
919                ));
920            }
921
922            // Pop contexts as needed based on indentation
923            let events = self.pop_contexts(indent)?;
924            if let Some(event) = events {
925                // Push back the current line to process after emitting list end
926                self.reader.push_back(line_num, line);
927                return Ok(Some(event));
928            }
929
930            // Parse line content
931            return self.parse_line(content, indent, line_num);
932        }
933    }
934
935    fn pop_contexts(&mut self, current_indent: usize) -> StreamResult<Option<NodeEvent>> {
936        while self.state.stack.len() > 1 {
937            // Safe: loop condition guarantees stack has elements
938            let should_pop = match self.state.stack.last().expect("stack has elements") {
939                Context::Root => false,
940                Context::Object { indent, .. } => current_indent <= *indent,
941                Context::List { row_indent, .. } => current_indent < *row_indent,
942            };
943
944            if should_pop {
945                // Safe: loop condition guarantees stack has elements
946                let ctx = self.state.stack.pop().expect("stack has elements");
947                if let Context::List {
948                    key,
949                    type_name,
950                    count,
951                    ..
952                } = ctx
953                {
954                    return Ok(Some(NodeEvent::ListEnd {
955                        key,
956                        type_name,
957                        count,
958                    }));
959                }
960            } else {
961                break;
962            }
963        }
964
965        Ok(None)
966    }
967
968    fn parse_line(
969        &mut self,
970        content: &str,
971        indent: usize,
972        line_num: usize,
973    ) -> StreamResult<Option<NodeEvent>> {
974        // Strip inline comment
975        let content = strip_comment(content);
976
977        if let Some(row_content) = content.strip_prefix('|') {
978            // Matrix row
979            self.parse_matrix_row(row_content, indent, line_num)
980        } else if let Some(colon_pos) = content.find(':') {
981            let key = content[..colon_pos].trim();
982            let after_colon = &content[colon_pos + 1..];
983
984            if !is_valid_key_token(key) {
985                return Err(StreamError::syntax(
986                    line_num,
987                    format!("invalid key: {}", key),
988                ));
989            }
990
991            let after_colon_trimmed = after_colon.trim();
992
993            if after_colon_trimmed.is_empty() {
994                // Object start
995                self.state.stack.push(Context::Object {
996                    key: key.to_string(),
997                    indent,
998                });
999                Ok(Some(NodeEvent::ObjectStart {
1000                    key: key.to_string(),
1001                    line: line_num,
1002                }))
1003            } else if after_colon_trimmed.starts_with('@')
1004                && self.is_list_start(after_colon_trimmed)
1005            {
1006                // List start
1007                let (type_name, schema) = self.parse_list_start(after_colon_trimmed, line_num)?;
1008
1009                self.state.stack.push(Context::List {
1010                    key: key.to_string(),
1011                    type_name: type_name.clone(),
1012                    schema: schema.clone(),
1013                    row_indent: indent + 1,
1014                    count: 0,
1015                    last_node: None,
1016                });
1017
1018                self.state.prev_row = None;
1019
1020                Ok(Some(NodeEvent::ListStart {
1021                    key: key.to_string(),
1022                    type_name,
1023                    schema,
1024                    line: line_num,
1025                }))
1026            } else {
1027                // Key-value pair
1028                let value = self.infer_value(after_colon.trim(), line_num)?;
1029                Ok(Some(NodeEvent::Scalar {
1030                    key: key.to_string(),
1031                    value,
1032                    line: line_num,
1033                }))
1034            }
1035        } else {
1036            Err(StreamError::syntax(line_num, "expected ':' in line"))
1037        }
1038    }
1039
1040    #[inline]
1041    fn is_list_start(&self, s: &str) -> bool {
1042        let s = s.trim();
1043        if !s.starts_with('@') {
1044            return false;
1045        }
1046        let rest = &s[1..];
1047        let type_end = rest
1048            .find(|c: char| c == '[' || c.is_whitespace())
1049            .unwrap_or(rest.len());
1050        let type_name = &rest[..type_end];
1051        is_valid_type_name(type_name)
1052    }
1053
1054    fn parse_list_start(&self, s: &str, line_num: usize) -> StreamResult<(String, Vec<String>)> {
1055        let s = s.trim();
1056        let rest = &s[1..]; // Skip @
1057
1058        if let Some(bracket_pos) = rest.find('[') {
1059            // Inline schema: @TypeName[col1, col2]
1060            let type_name = &rest[..bracket_pos];
1061            if !is_valid_type_name(type_name) {
1062                return Err(StreamError::syntax(
1063                    line_num,
1064                    format!("invalid type name: {}", type_name),
1065                ));
1066            }
1067
1068            let bracket_end = rest
1069                .find(']')
1070                .ok_or_else(|| StreamError::syntax(line_num, "missing ']'"))?;
1071
1072            let cols_str = &rest[bracket_pos + 1..bracket_end];
1073            let columns: Vec<String> = cols_str
1074                .split(',')
1075                .map(|s| s.trim().to_string())
1076                .filter(|s| !s.is_empty())
1077                .collect();
1078
1079            Ok((type_name.to_string(), columns))
1080        } else {
1081            // Reference to declared schema: @TypeName
1082            let type_name = rest.trim();
1083            if !is_valid_type_name(type_name) {
1084                return Err(StreamError::syntax(
1085                    line_num,
1086                    format!("invalid type name: {}", type_name),
1087                ));
1088            }
1089
1090            let header = self
1091                .header
1092                .as_ref()
1093                .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1094
1095            let schema = header.structs.get(type_name).ok_or_else(|| {
1096                StreamError::schema(line_num, format!("undefined type: {}", type_name))
1097            })?;
1098
1099            Ok((type_name.to_string(), schema.clone()))
1100        }
1101    }
1102
1103    fn parse_matrix_row(
1104        &mut self,
1105        content: &str,
1106        indent: usize,
1107        line_num: usize,
1108    ) -> StreamResult<Option<NodeEvent>> {
1109        let content = strip_comment(content).trim();
1110
1111        // Find active list context
1112        let (type_name, schema, parent_info) = self.find_list_context(indent, line_num)?;
1113
1114        // Parse HEDL matrix row (comma-separated values after the |)
1115        // Use hedl_row parser for proper CSV-like parsing
1116        let fields = hedl_core::lex::parse_csv_row(content)
1117            .map_err(|e| StreamError::syntax(line_num, format!("row parse error: {}", e)))?;
1118
1119        // Validate shape
1120        if fields.len() != schema.len() {
1121            return Err(StreamError::ShapeMismatch {
1122                line: line_num,
1123                expected: schema.len(),
1124                got: fields.len(),
1125            });
1126        }
1127
1128        // Infer values with ditto handling
1129        let mut values = Vec::with_capacity(fields.len());
1130        for (col_idx, field) in fields.iter().enumerate() {
1131            let value = if field.value == "^" {
1132                // Ditto - use previous row's value
1133                self.state
1134                    .prev_row
1135                    .as_ref()
1136                    .and_then(|prev| prev.get(col_idx).cloned())
1137                    .unwrap_or(Value::Null)
1138            } else if field.is_quoted {
1139                Value::String(field.value.clone())
1140            } else {
1141                self.infer_value(&field.value, line_num)?
1142            };
1143            values.push(value);
1144        }
1145
1146        // Get ID from first column
1147        let id = match &values[0] {
1148            Value::String(s) => s.clone(),
1149            _ => return Err(StreamError::syntax(line_num, "ID column must be a string")),
1150        };
1151
1152        // Update context
1153        self.update_list_context(&type_name, &id);
1154        self.state.prev_row = Some(values.clone());
1155
1156        // Build node info
1157        let mut node = NodeInfo::new(type_name.clone(), id, values, indent, line_num);
1158
1159        if let Some((parent_type, parent_id)) = parent_info {
1160            node = node.with_parent(parent_type, parent_id);
1161        }
1162
1163        Ok(Some(NodeEvent::Node(node)))
1164    }
1165
1166    fn find_list_context(
1167        &mut self,
1168        indent: usize,
1169        line_num: usize,
1170    ) -> StreamResult<ListContextResult> {
1171        let header = self
1172            .header
1173            .as_ref()
1174            .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1175
1176        for ctx in self.state.stack.iter().rev() {
1177            if let Context::List {
1178                type_name,
1179                schema,
1180                row_indent,
1181                last_node,
1182                ..
1183            } = ctx
1184            {
1185                if indent == *row_indent {
1186                    // Peer row
1187                    return Ok((type_name.clone(), schema.clone(), None));
1188                } else if indent == *row_indent + 1 {
1189                    // Child row
1190                    let parent_info = last_node.clone().ok_or_else(|| {
1191                        StreamError::orphan_row(line_num, "child row has no parent")
1192                    })?;
1193
1194                    let child_type = header.nests.get(type_name).ok_or_else(|| {
1195                        StreamError::orphan_row(
1196                            line_num,
1197                            format!("no NEST rule for parent type '{}'", type_name),
1198                        )
1199                    })?;
1200
1201                    let child_schema = header.structs.get(child_type).ok_or_else(|| {
1202                        StreamError::schema(
1203                            line_num,
1204                            format!("child type '{}' not defined", child_type),
1205                        )
1206                    })?;
1207
1208                    // Push child list context
1209                    self.state.stack.push(Context::List {
1210                        key: child_type.clone(),
1211                        type_name: child_type.clone(),
1212                        schema: child_schema.clone(),
1213                        row_indent: indent,
1214                        count: 0,
1215                        last_node: None,
1216                    });
1217
1218                    return Ok((child_type.clone(), child_schema.clone(), Some(parent_info)));
1219                }
1220            }
1221        }
1222
1223        Err(StreamError::syntax(
1224            line_num,
1225            "matrix row outside of list context",
1226        ))
1227    }
1228
1229    fn update_list_context(&mut self, type_name: &str, id: &str) {
1230        for ctx in self.state.stack.iter_mut().rev() {
1231            if let Context::List {
1232                type_name: ctx_type,
1233                last_node,
1234                count,
1235                ..
1236            } = ctx
1237            {
1238                if ctx_type == type_name {
1239                    *last_node = Some((type_name.to_string(), id.to_string()));
1240                    *count += 1;
1241                    break;
1242                }
1243            }
1244        }
1245    }
1246
1247    #[inline]
1248    fn infer_value(&self, s: &str, _line_num: usize) -> StreamResult<Value> {
1249        let s = s.trim();
1250
1251        if s.is_empty() || s == "~" {
1252            return Ok(Value::Null);
1253        }
1254
1255        if s == "true" {
1256            return Ok(Value::Bool(true));
1257        }
1258        if s == "false" {
1259            return Ok(Value::Bool(false));
1260        }
1261
1262        // Reference
1263        if let Some(ref_part) = s.strip_prefix('@') {
1264            if let Some(colon_pos) = ref_part.find(':') {
1265                let type_name = &ref_part[..colon_pos];
1266                let id = &ref_part[colon_pos + 1..];
1267                return Ok(Value::Reference(hedl_core::Reference {
1268                    type_name: Some(type_name.to_string()),
1269                    id: id.to_string(),
1270                }));
1271            } else {
1272                return Ok(Value::Reference(hedl_core::Reference {
1273                    type_name: None,
1274                    id: ref_part.to_string(),
1275                }));
1276            }
1277        }
1278
1279        // Alias
1280        if let Some(alias) = s.strip_prefix('$') {
1281            if let Some(header) = &self.header {
1282                if let Some(value) = header.aliases.get(alias) {
1283                    return Ok(Value::String(value.clone()));
1284                }
1285            }
1286            return Ok(Value::String(s.to_string()));
1287        }
1288
1289        // Number
1290        if let Ok(i) = s.parse::<i64>() {
1291            return Ok(Value::Int(i));
1292        }
1293        if let Ok(f) = s.parse::<f64>() {
1294            return Ok(Value::Float(f));
1295        }
1296
1297        // Default to string
1298        Ok(Value::String(s.to_string()))
1299    }
1300
1301    fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
1302        // Pop remaining contexts
1303        while self.state.stack.len() > 1 {
1304            // Safe: loop condition guarantees stack has elements
1305            let ctx = self.state.stack.pop().expect("stack has elements");
1306            if let Context::List {
1307                key,
1308                type_name,
1309                count,
1310                ..
1311            } = ctx
1312            {
1313                return Ok(Some(NodeEvent::ListEnd {
1314                    key,
1315                    type_name,
1316                    count,
1317                }));
1318            }
1319        }
1320
1321        Ok(Some(NodeEvent::EndOfDocument))
1322    }
1323}
1324
1325impl<R: Read> Iterator for StreamingParser<R> {
1326    type Item = StreamResult<NodeEvent>;
1327
1328    fn next(&mut self) -> Option<Self::Item> {
1329        match self.next_event() {
1330            Ok(Some(NodeEvent::EndOfDocument)) => None,
1331            Ok(Some(event)) => Some(Ok(event)),
1332            Ok(None) => None,
1333            Err(e) => Some(Err(e)),
1334        }
1335    }
1336}
1337
1338/// SIMD-optimized comment scanning module.
1339///
1340/// This module provides AVX2-accelerated scanning for the '#' character
1341/// when detecting comment boundaries. Falls back to scalar implementation
1342/// on non-AVX2 platforms or when the feature is disabled.
1343mod simd_comment {
1344    #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1345    use std::arch::x86_64::*;
1346
1347    /// Find the first occurrence of '#' in the input string using SIMD.
1348    ///
1349    /// This function scans 32 bytes at a time using AVX2 instructions
1350    /// on x86_64 platforms when the `avx2` feature is enabled.
1351    ///
1352    /// # Safety
1353    ///
1354    /// On AVX2-enabled platforms, this uses `unsafe` SIMD intrinsics
1355    /// but ensures all memory accesses are valid.
1356    #[inline]
1357    pub fn find_hash_simd(s: &[u8]) -> Option<usize> {
1358        #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1359        {
1360            // Check if AVX2 is available at runtime
1361            if is_x86_feature_detected!("avx2") {
1362                return unsafe { find_hash_avx2(s) };
1363            }
1364        }
1365
1366        // Fallback to scalar search
1367        find_hash_scalar(s)
1368    }
1369
1370    /// AVX2 implementation for finding '#' character.
1371    ///
1372    /// Scans 32 bytes at a time using SIMD instructions.
1373    #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1374    #[target_feature(enable = "avx2")]
1375    unsafe fn find_hash_avx2(s: &[u8]) -> Option<usize> {
1376        const CHUNK_SIZE: usize = 32;
1377        let len = s.len();
1378
1379        if len == 0 {
1380            return None;
1381        }
1382
1383        let hash_vec = _mm256_set1_epi8(b'#' as i8);
1384        let mut offset = 0;
1385
1386        // Process 32-byte chunks
1387        while offset + CHUNK_SIZE <= len {
1388            // Load 32 bytes from input
1389            let chunk = _mm256_loadu_si256(s.as_ptr().add(offset) as *const __m256i);
1390
1391            // Compare with '#' character
1392            let matches = _mm256_cmpeq_epi8(chunk, hash_vec);
1393
1394            // Convert to bitmask
1395            let mask = _mm256_movemask_epi8(matches);
1396
1397            if mask != 0 {
1398                // Found at least one match, find the first one
1399                let bit_pos = mask.trailing_zeros() as usize;
1400                return Some(offset + bit_pos);
1401            }
1402
1403            offset += CHUNK_SIZE;
1404        }
1405
1406        // Handle remaining bytes with scalar search
1407        find_hash_scalar(&s[offset..]).map(|pos| offset + pos)
1408    }
1409
1410    /// Scalar fallback implementation for finding '#' character.
1411    #[inline]
1412    fn find_hash_scalar(s: &[u8]) -> Option<usize> {
1413        s.iter().position(|&b| b == b'#')
1414    }
1415
1416    #[cfg(test)]
1417    mod tests {
1418        use super::*;
1419
1420        #[test]
1421        fn test_find_hash_basic() {
1422            assert_eq!(find_hash_simd(b"hello # world"), Some(6));
1423            assert_eq!(find_hash_simd(b"no comment"), None);
1424            assert_eq!(find_hash_simd(b"#start"), Some(0));
1425            assert_eq!(find_hash_simd(b"end#"), Some(3));
1426        }
1427
1428        #[test]
1429        fn test_find_hash_long() {
1430            // Test with string longer than SIMD chunk size
1431            let long = b"a".repeat(100);
1432            assert_eq!(find_hash_simd(&long), None);
1433
1434            let mut with_hash = b"a".repeat(50);
1435            with_hash.push(b'#');
1436            with_hash.extend_from_slice(&b"a".repeat(50));
1437            assert_eq!(find_hash_simd(&with_hash), Some(50));
1438        }
1439
1440        #[test]
1441        fn test_find_hash_edge_cases() {
1442            assert_eq!(find_hash_simd(b""), None);
1443            assert_eq!(find_hash_simd(b"#"), Some(0));
1444            assert_eq!(find_hash_simd(b"##"), Some(0));
1445        }
1446
1447        #[test]
1448        fn test_find_hash_alignment() {
1449            // Test various alignment scenarios
1450            for offset in 0..32 {
1451                let mut data = vec![b'a'; offset];
1452                data.push(b'#');
1453                data.extend_from_slice(&vec![b'b'; 32]);
1454                assert_eq!(find_hash_simd(&data), Some(offset));
1455            }
1456        }
1457
1458        #[test]
1459        fn test_find_hash_multiple() {
1460            assert_eq!(find_hash_simd(b"# # #"), Some(0));
1461            assert_eq!(find_hash_simd(b"a # # #"), Some(2));
1462        }
1463    }
1464}
1465
1466/// Strip inline comments from a line, respecting quoted strings and escapes.
1467///
1468/// Finds the first unquoted, unescaped '#' character and returns the string
1469/// up to that point. Uses SIMD acceleration when available.
1470///
1471/// # Examples
1472///
1473/// ```text
1474/// use hedl_stream::parser::strip_comment;
1475/// assert_eq!(strip_comment("hello # comment"), "hello");
1476/// assert_eq!(strip_comment(r#""hello # not comment""#), r#""hello # not comment""#);
1477/// assert_eq!(strip_comment("no comment"), "no comment");
1478/// ```
1479#[inline]
1480pub(crate) fn strip_comment(s: &str) -> &str {
1481    // Find # not inside quotes using SIMD-accelerated scanning
1482    let bytes = s.as_bytes();
1483    let mut in_quotes = false;
1484    let mut escape = false;
1485    let mut search_start = 0;
1486
1487    loop {
1488        // Use SIMD to find the next potential comment start
1489        let hash_pos = match simd_comment::find_hash_simd(&bytes[search_start..]) {
1490            Some(pos) => search_start + pos,
1491            None => return s, // No '#' found
1492        };
1493
1494        // Verify this '#' is not inside quotes or escaped
1495        // Scan from last search position to hash position
1496        for i in search_start..hash_pos {
1497            let c = bytes[i];
1498
1499            if escape {
1500                escape = false;
1501                continue;
1502            }
1503
1504            match c {
1505                b'\\' => escape = true,
1506                b'"' => in_quotes = !in_quotes,
1507                _ => {}
1508            }
1509        }
1510
1511        // Check the '#' itself
1512        if escape {
1513            // '#' is escaped, continue searching
1514            escape = false;
1515            search_start = hash_pos + 1;
1516            continue;
1517        }
1518
1519        if !in_quotes {
1520            // Found unquoted, unescaped '#' - this is the comment start
1521            return s[..hash_pos].trim_end();
1522        }
1523
1524        // '#' is inside quotes, continue searching
1525        search_start = hash_pos + 1;
1526    }
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531    use super::*;
1532    use std::io::Cursor;
1533
1534    // ============ HEADER PARSING TESTS ============
1535
1536    #[test]
1537    fn test_parse_header() {
1538        let input = r#"
1539%VERSION: 1.0
1540%STRUCT: User: [id, name, email]
1541%ALIAS active = "Active"
1542%NEST: User > Order
1543---
1544"#;
1545        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1546        let header = parser.header().unwrap();
1547
1548        assert_eq!(header.version, (1, 0));
1549        assert_eq!(
1550            header.structs.get("User"),
1551            Some(&vec![
1552                "id".to_string(),
1553                "name".to_string(),
1554                "email".to_string()
1555            ])
1556        );
1557        assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
1558        assert_eq!(header.nests.get("User"), Some(&"Order".to_string()));
1559    }
1560
1561    #[test]
1562    fn test_header_missing_version() {
1563        let input = r#"
1564%STRUCT: User: [id, name]
1565---
1566"#;
1567        let result = StreamingParser::new(Cursor::new(input));
1568        assert!(matches!(result, Err(StreamError::MissingVersion)));
1569    }
1570
1571    #[test]
1572    fn test_header_invalid_version_format() {
1573        let input = r#"
1574%VERSION abc
1575---
1576"#;
1577        let result = StreamingParser::new(Cursor::new(input));
1578        assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
1579    }
1580
1581    #[test]
1582    fn test_header_version_single_number() {
1583        let input = r#"
1584%VERSION 1
1585---
1586"#;
1587        let result = StreamingParser::new(Cursor::new(input));
1588        assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
1589    }
1590
1591    #[test]
1592    fn test_header_multiple_schemas() {
1593        let input = r#"
1594%VERSION: 1.0
1595%STRUCT: User: [id, name]
1596%STRUCT: Product: [id, title, price]
1597%STRUCT: Order: [id, user_id, product_id]
1598---
1599"#;
1600        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1601        let header = parser.header().unwrap();
1602
1603        assert_eq!(header.structs.len(), 3);
1604        assert!(header.structs.contains_key("User"));
1605        assert!(header.structs.contains_key("Product"));
1606        assert!(header.structs.contains_key("Order"));
1607    }
1608
1609    #[test]
1610    fn test_header_struct_missing_bracket() {
1611        let input = r#"
1612%VERSION: 1.0
1613%STRUCT User id, name
1614---
1615"#;
1616        let result = StreamingParser::new(Cursor::new(input));
1617        assert!(matches!(result, Err(StreamError::Syntax { .. })));
1618    }
1619
1620    #[test]
1621    fn test_header_empty_struct() {
1622        let input = r#"
1623%VERSION: 1.0
1624%STRUCT: User: []
1625---
1626"#;
1627        let result = StreamingParser::new(Cursor::new(input));
1628        assert!(matches!(result, Err(StreamError::Syntax { .. })));
1629    }
1630
1631    #[test]
1632    fn test_header_alias_missing_equals() {
1633        let input = r#"
1634%VERSION: 1.0
1635%ALIAS foo "bar"
1636---
1637"#;
1638        let result = StreamingParser::new(Cursor::new(input));
1639        assert!(matches!(result, Err(StreamError::Syntax { .. })));
1640    }
1641
1642    #[test]
1643    fn test_header_nest_missing_arrow() {
1644        let input = r#"
1645%VERSION: 1.0
1646%NEST Parent Child
1647---
1648"#;
1649        let result = StreamingParser::new(Cursor::new(input));
1650        assert!(matches!(result, Err(StreamError::Syntax { .. })));
1651    }
1652
1653    #[test]
1654    fn test_header_with_comments() {
1655        let input = r#"
1656%VERSION: 1.0
1657# This is a comment
1658%STRUCT: User: [id, name] # inline comment
1659---
1660"#;
1661        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1662        let header = parser.header().unwrap();
1663        assert!(header.structs.contains_key("User"));
1664    }
1665
1666    #[test]
1667    fn test_header_blank_lines() {
1668        let input = r#"
1669%VERSION: 1.0
1670
1671%STRUCT: User: [id, name]
1672
1673---
1674"#;
1675        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1676        let header = parser.header().unwrap();
1677        assert!(header.structs.contains_key("User"));
1678    }
1679
1680    // ============ BASIC STREAMING TESTS ============
1681
1682    #[test]
1683    fn test_streaming_nodes() {
1684        let input = r#"
1685%VERSION: 1.0
1686%STRUCT: User: [id, name]
1687---
1688users: @User
1689  | alice, Alice Smith
1690  | bob, Bob Jones
1691"#;
1692        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1693
1694        let events: Vec<_> = parser.collect();
1695        for event in &events {
1696            if let Err(e) = event {
1697                eprintln!("Error: {:?}", e);
1698            }
1699        }
1700        assert!(events.iter().all(|e| e.is_ok()));
1701
1702        let nodes: Vec<_> = events
1703            .iter()
1704            .filter_map(|e| e.as_ref().ok())
1705            .filter_map(|e| e.as_node())
1706            .collect();
1707
1708        assert_eq!(nodes.len(), 2);
1709        assert_eq!(nodes[0].id, "alice");
1710        assert_eq!(nodes[1].id, "bob");
1711    }
1712
1713    #[test]
1714    fn test_streaming_empty_body() {
1715        let input = r#"
1716%VERSION: 1.0
1717%STRUCT: User: [id, name]
1718---
1719"#;
1720        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1721        let events: Vec<_> = parser.collect();
1722        assert!(events.is_empty());
1723    }
1724
1725    #[test]
1726    fn test_streaming_list_start_end_events() {
1727        let input = r#"
1728%VERSION: 1.0
1729%STRUCT: User: [id, name]
1730---
1731users: @User
1732  | alice, Alice
1733"#;
1734        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1735        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1736
1737        let list_starts: Vec<_> = events
1738            .iter()
1739            .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
1740            .collect();
1741        let list_ends: Vec<_> = events
1742            .iter()
1743            .filter(|e| matches!(e, NodeEvent::ListEnd { .. }))
1744            .collect();
1745
1746        assert_eq!(list_starts.len(), 1);
1747        assert_eq!(list_ends.len(), 1);
1748
1749        if let NodeEvent::ListStart { key, type_name, .. } = &list_starts[0] {
1750            assert_eq!(key, "users");
1751            assert_eq!(type_name, "User");
1752        }
1753
1754        if let NodeEvent::ListEnd {
1755            type_name, count, ..
1756        } = &list_ends[0]
1757        {
1758            assert_eq!(type_name, "User");
1759            assert_eq!(*count, 1);
1760        }
1761    }
1762
1763    // ============ MATRIX ROW EDGE CASES ============
1764
1765    #[test]
1766    fn test_matrix_row_empty_fields() {
1767        // Note: Empty fields are NOT preserved in the current pipe-splitting logic
1768        // When the field is truly empty (just whitespace between pipes), it's filtered
1769        // Use ~ (tilde) for explicit null values
1770        let input = r#"
1771%VERSION: 1.0
1772%STRUCT: Data: [id, optional, required]
1773---
1774data: @Data
1775  | row1, ~, value
1776"#;
1777        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1778        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1779        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1780
1781        assert_eq!(nodes.len(), 1);
1782        assert_eq!(nodes[0].id, "row1");
1783        assert_eq!(nodes[0].fields[1], Value::Null);
1784    }
1785
1786    #[test]
1787    fn test_matrix_row_quoted_fields() {
1788        let input = r#"
1789%VERSION: 1.0
1790%STRUCT: Data: [id, description]
1791---
1792data: @Data
1793  | row1, "Hello, World"
1794"#;
1795        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1796        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1797        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1798
1799        assert_eq!(nodes.len(), 1);
1800        assert_eq!(
1801            nodes[0].fields[1],
1802            Value::String("Hello, World".to_string())
1803        );
1804    }
1805
1806    #[test]
1807    fn test_matrix_row_shape_mismatch() {
1808        let input = r#"
1809%VERSION: 1.0
1810%STRUCT: User: [id, name, email]
1811---
1812users: @User
1813  | alice, Alice
1814"#;
1815        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1816        let events: Vec<_> = parser.collect();
1817        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
1818
1819        assert!(!errors.is_empty());
1820        if let Err(StreamError::ShapeMismatch { expected, got, .. }) = &errors[0] {
1821            assert_eq!(*expected, 3);
1822            assert_eq!(*got, 2);
1823        }
1824    }
1825
1826    #[test]
1827    fn test_matrix_row_references() {
1828        let input = r#"
1829%VERSION: 1.0
1830%STRUCT: Order: [id, user]
1831---
1832orders: @Order
1833  | order1, @User:alice
1834  | order2, @bob
1835"#;
1836        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1837        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1838        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1839
1840        assert_eq!(nodes.len(), 2);
1841
1842        if let Value::Reference(r) = &nodes[0].fields[1] {
1843            assert_eq!(r.type_name, Some("User".to_string()));
1844            assert_eq!(r.id, "alice");
1845        } else {
1846            panic!("Expected reference");
1847        }
1848
1849        if let Value::Reference(r) = &nodes[1].fields[1] {
1850            assert_eq!(r.type_name, None);
1851            assert_eq!(r.id, "bob");
1852        } else {
1853            panic!("Expected reference");
1854        }
1855    }
1856
1857    #[test]
1858    fn test_matrix_row_booleans() {
1859        let input = r#"
1860%VERSION: 1.0
1861%STRUCT: Flag: [id, active, verified]
1862---
1863flags: @Flag
1864  | flag1, true, false
1865"#;
1866        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1867        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1868        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1869
1870        assert_eq!(nodes.len(), 1);
1871        assert_eq!(nodes[0].fields[1], Value::Bool(true));
1872        assert_eq!(nodes[0].fields[2], Value::Bool(false));
1873    }
1874
1875    #[test]
1876    fn test_matrix_row_numbers() {
1877        let input = r#"
1878%VERSION: 1.0
1879%STRUCT: Data: [id, int_val, float_val]
1880---
1881data: @Data
1882  | row1, 42, 3.5
1883  | row2, -100, -2.5
1884"#;
1885        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1886        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1887        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1888
1889        assert_eq!(nodes.len(), 2);
1890        assert_eq!(nodes[0].fields[1], Value::Int(42));
1891        assert_eq!(nodes[0].fields[2], Value::Float(3.5));
1892        assert_eq!(nodes[1].fields[1], Value::Int(-100));
1893        assert_eq!(nodes[1].fields[2], Value::Float(-2.5));
1894    }
1895
1896    #[test]
1897    fn test_matrix_row_null() {
1898        let input = r#"
1899%VERSION: 1.0
1900%STRUCT: Data: [id, nullable]
1901---
1902data: @Data
1903  | row1, ~
1904"#;
1905        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1906        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1907        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1908
1909        assert_eq!(nodes.len(), 1);
1910        assert_eq!(nodes[0].fields[1], Value::Null);
1911    }
1912
1913    #[test]
1914    fn test_matrix_row_ditto() {
1915        let input = r#"
1916%VERSION: 1.0
1917%STRUCT: Data: [id, category]
1918---
1919data: @Data
1920  | row1, CategoryA
1921  | row2, ^
1922  | row3, ^
1923"#;
1924        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1925        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1926        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1927
1928        assert_eq!(nodes.len(), 3);
1929        assert_eq!(nodes[0].fields[1], Value::String("CategoryA".to_string()));
1930        assert_eq!(nodes[1].fields[1], Value::String("CategoryA".to_string()));
1931        assert_eq!(nodes[2].fields[1], Value::String("CategoryA".to_string()));
1932    }
1933
1934    #[test]
1935    fn test_matrix_row_alias_substitution() {
1936        let input = r#"
1937%VERSION: 1.0
1938%ALIAS status = "Active"
1939%STRUCT: User: [id, status]
1940---
1941users: @User
1942  | alice, $status
1943"#;
1944        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1945        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1946        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1947
1948        assert_eq!(nodes.len(), 1);
1949        assert_eq!(nodes[0].fields[1], Value::String("Active".to_string()));
1950    }
1951
1952    // ============ INLINE SCHEMA TESTS ============
1953
1954    #[test]
1955    fn test_inline_schema() {
1956        let input = r#"
1957%VERSION: 1.0
1958---
1959items: @Item[id, name]
1960  | item1, First
1961  | item2, Second
1962"#;
1963        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1964        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1965        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1966
1967        assert_eq!(nodes.len(), 2);
1968        assert_eq!(nodes[0].type_name, "Item");
1969    }
1970
1971    #[test]
1972    fn test_inline_schema_overrides_header() {
1973        let input = r#"
1974%VERSION: 1.0
1975%STRUCT: Item: [id, name, extra]
1976---
1977items: @Item[id, name]
1978  | item1, First
1979"#;
1980        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1981        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
1982        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1983
1984        // Should use inline schema with 2 fields, not header schema with 3
1985        assert_eq!(nodes.len(), 1);
1986        assert_eq!(nodes[0].fields.len(), 2);
1987    }
1988
1989    // ============ OBJECT CONTEXT TESTS ============
1990
1991    #[test]
1992    fn test_object_context() {
1993        let input = r#"
1994%VERSION: 1.0
1995%STRUCT: User: [id, name]
1996---
1997db:
1998  users: @User
1999    | alice, Alice
2000"#;
2001        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2002        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2003
2004        let obj_starts: Vec<_> = events
2005            .iter()
2006            .filter(|e| matches!(e, NodeEvent::ObjectStart { .. }))
2007            .collect();
2008        assert_eq!(obj_starts.len(), 1);
2009
2010        if let NodeEvent::ObjectStart { key, .. } = obj_starts[0] {
2011            assert_eq!(key, "db");
2012        }
2013    }
2014
2015    #[test]
2016    fn test_scalar_value() {
2017        let input = r#"
2018%VERSION: 1.0
2019---
2020config:
2021  timeout: 30
2022  enabled: true
2023  name: "Test Config"
2024"#;
2025        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2026        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2027
2028        let scalars: Vec<_> = events
2029            .iter()
2030            .filter(|e| matches!(e, NodeEvent::Scalar { .. }))
2031            .collect();
2032        assert_eq!(scalars.len(), 3);
2033    }
2034
2035    // ============ UNICODE TESTS ============
2036
2037    #[test]
2038    fn test_unicode_ids() {
2039        let input = r#"
2040%VERSION: 1.0
2041%STRUCT: User: [id, name]
2042---
2043users: @User
2044  | 用户1, 张三
2045  | пользователь, Иван
2046"#;
2047        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2048        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2049        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2050
2051        assert_eq!(nodes.len(), 2);
2052        assert_eq!(nodes[0].id, "用户1");
2053        assert_eq!(nodes[1].id, "пользователь");
2054    }
2055
2056    #[test]
2057    fn test_unicode_in_values() {
2058        let input = r#"
2059%VERSION: 1.0
2060%STRUCT: Data: [id, emoji]
2061---
2062data: @Data
2063  | row1, 🎉✨🚀
2064"#;
2065        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2066        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2067        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2068
2069        assert_eq!(nodes.len(), 1);
2070        assert_eq!(nodes[0].fields[1], Value::String("🎉✨🚀".to_string()));
2071    }
2072
2073    // ============ COMMENT HANDLING TESTS ============
2074
2075    #[test]
2076    fn test_inline_comments() {
2077        let input = r#"
2078%VERSION: 1.0
2079%STRUCT: User: [id, name]
2080---
2081users: @User  # list of users
2082  | alice, Alice Smith  # first user
2083  | bob, Bob Jones
2084"#;
2085        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2086        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2087        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2088
2089        assert_eq!(nodes.len(), 2);
2090        // Comments should be stripped
2091        assert_eq!(nodes[0].id, "alice");
2092    }
2093
2094    #[test]
2095    fn test_full_line_comments() {
2096        let input = r#"
2097%VERSION: 1.0
2098%STRUCT: User: [id, name]
2099---
2100# This is a comment
2101users: @User
2102  # Comment between rows
2103  | alice, Alice
2104  | bob, Bob
2105"#;
2106        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2107        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2108        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2109
2110        assert_eq!(nodes.len(), 2);
2111    }
2112
2113    #[test]
2114    fn test_hash_in_quoted_string() {
2115        let input =
2116            "%VERSION: 1.0\n%STRUCT: Data: [id, tag]\n---\ndata: @Data\n  | row1, \"#hashtag\"\n";
2117        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2118        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2119        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2120
2121        assert_eq!(nodes.len(), 1);
2122        assert_eq!(nodes[0].fields[1], Value::String("#hashtag".to_string()));
2123    }
2124
2125    // ============ INDENT AND CONTEXT TESTS ============
2126
2127    #[test]
2128    fn test_multiple_lists() {
2129        let input = r#"
2130%VERSION: 1.0
2131%STRUCT: User: [id, name]
2132%STRUCT: Product: [id, title]
2133---
2134users: @User
2135  | alice, Alice
2136products: @Product
2137  | prod1, Widget
2138"#;
2139        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2140        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2141        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2142
2143        assert_eq!(nodes.len(), 2);
2144        assert_eq!(nodes[0].type_name, "User");
2145        assert_eq!(nodes[1].type_name, "Product");
2146    }
2147
2148    #[test]
2149    fn test_excessive_indent_error() {
2150        let config = StreamingParserConfig {
2151            max_indent_depth: 2,
2152            ..Default::default()
2153        };
2154        let input = r#"
2155%VERSION: 1.0
2156%STRUCT: Data: [id]
2157---
2158level1:
2159  level2:
2160    level3:
2161      data: @Data
2162        | row1
2163"#;
2164        let parser = StreamingParser::with_config(Cursor::new(input), config).unwrap();
2165
2166        // Should get an error for excessive indent
2167        let mut found_indent_error = false;
2168        for result in parser {
2169            if let Err(StreamError::Syntax { message, .. }) = result {
2170                if message.contains("indent depth") {
2171                    found_indent_error = true;
2172                    break;
2173                }
2174            }
2175        }
2176        assert!(found_indent_error);
2177    }
2178
2179    // ============ ERROR HANDLING TESTS ============
2180
2181    #[test]
2182    fn test_undefined_schema() {
2183        let input = r#"
2184%VERSION: 1.0
2185---
2186users: @User
2187  | alice, Alice
2188"#;
2189        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2190        let events: Vec<_> = parser.collect();
2191        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2192
2193        // Should get an error because User schema is not defined
2194        assert!(!errors.is_empty());
2195    }
2196
2197    #[test]
2198    fn test_orphan_row_without_context() {
2199        let input = r#"
2200%VERSION: 1.0
2201%STRUCT: Data: [id]
2202---
2203| orphan_row
2204"#;
2205        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2206        let events: Vec<_> = parser.collect();
2207        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2208
2209        assert!(!errors.is_empty());
2210    }
2211
2212    #[test]
2213    fn test_missing_colon_error() {
2214        let input = r#"
2215%VERSION: 1.0
2216---
2217invalid line without colon
2218"#;
2219        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2220        let events: Vec<_> = parser.collect();
2221        let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2222
2223        assert!(!errors.is_empty());
2224        if let Err(StreamError::Syntax { message, .. }) = &errors[0] {
2225            assert!(message.contains(":"));
2226        }
2227    }
2228
2229    // ============ LARGE FILE SIMULATION ============
2230
2231    #[test]
2232    fn test_many_rows() {
2233        let mut input = String::from(
2234            r#"
2235%VERSION: 1.0
2236%STRUCT: Data: [id, value]
2237---
2238data: @Data
2239"#,
2240        );
2241        for i in 0..1000 {
2242            input.push_str(&format!("  | row{}, value{}\n", i, i));
2243        }
2244
2245        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2246        let events: Vec<_> = parser.filter_map(|e| e.ok()).collect();
2247        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2248
2249        assert_eq!(nodes.len(), 1000);
2250    }
2251
2252    // ============ STRIP COMMENT HELPER TESTS ============
2253
2254    #[test]
2255    fn test_strip_comment_basic() {
2256        assert_eq!(strip_comment("hello # comment"), "hello");
2257    }
2258
2259    #[test]
2260    fn test_strip_comment_quoted() {
2261        assert_eq!(
2262            strip_comment(r#""hello # not comment""#),
2263            r#""hello # not comment""#
2264        );
2265    }
2266
2267    #[test]
2268    fn test_strip_comment_escaped() {
2269        // Backslash escapes the hash, so \# is not treated as comment start
2270        assert_eq!(
2271            strip_comment(r#"hello\# not a comment"#),
2272            r#"hello\# not a comment"#
2273        );
2274        // But a later unescaped hash still starts a comment
2275        assert_eq!(
2276            strip_comment(r#"hello\# still here # comment"#),
2277            r#"hello\# still here"#
2278        );
2279    }
2280
2281    #[test]
2282    fn test_strip_comment_escaped_in_quotes() {
2283        // Inside quotes, backslash-hash is preserved
2284        assert_eq!(
2285            strip_comment(r#""hello\#world" more"#),
2286            r#""hello\#world" more"#
2287        );
2288    }
2289
2290    #[test]
2291    fn test_strip_comment_no_comment() {
2292        assert_eq!(strip_comment("hello world"), "hello world");
2293    }
2294}