Skip to main content

hedl_stream/parser/
core.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Core streaming parser implementation
19
20use super::config::StreamingParserConfig;
21use super::context::{self, Context, ParserState};
22use super::directives;
23use super::list_parsing;
24use super::value_inference;
25use crate::buffer_pool::BufferPool;
26use crate::error::{StreamError, StreamResult};
27use crate::event::{HeaderInfo, NodeEvent};
28use crate::reader::LineReader;
29use hedl_core::lex::{calculate_indent, is_valid_key_token, strip_comment};
30use std::io::Read;
31use std::time::Instant;
32
33/// Streaming HEDL parser.
34///
35/// Processes HEDL documents incrementally, yielding `NodeEvent` items as they
36/// are parsed without loading the entire document into memory.
37pub struct StreamingParser<R: Read> {
38    reader: LineReader<R>,
39    config: StreamingParserConfig,
40    header: Option<HeaderInfo>,
41    state: ParserState,
42    finished: bool,
43    errored: bool,              // Track if an error occurred to skip finalize
44    sent_end_of_document: bool, // Track if EndOfDocument has been returned
45    start_time: Instant,
46    operations_count: usize, // Track operations for periodic timeout checks
47    // Note: Buffer pool integration deferred - requires refactoring of parse_data_row
48    // to support pooled allocations. Current direct allocation pattern is simpler
49    // and performs adequately for typical use cases. Pooling would benefit only
50    // extremely high-throughput scenarios (>1M rows/sec).
51    _buffer_pool: Option<BufferPool>, // Optional buffer pool for high-throughput scenarios (not yet integrated)
52}
53
54impl<R: Read> StreamingParser<R> {
55    /// Create a new streaming parser with default configuration.
56    ///
57    /// The parser immediately reads and validates the HEDL header (version and
58    /// schema directives). If the header is invalid, this function returns an error.
59    ///
60    /// # Parameters
61    ///
62    /// - `reader`: Any type implementing `Read` (files, network streams, buffers, etc.)
63    ///
64    /// # Returns
65    ///
66    /// - `Ok(parser)`: Parser ready to yield events
67    /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
68    ///
69    /// # Examples
70    ///
71    /// ## From a File
72    ///
73    /// ```rust,no_run
74    /// use hedl_stream::StreamingParser;
75    /// use std::fs::File;
76    /// use std::io::BufReader;
77    ///
78    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
79    /// let file = File::open("data.hedl")?;
80    /// let reader = BufReader::new(file);
81    /// let parser = StreamingParser::new(reader)?;
82    /// # Ok(())
83    /// # }
84    /// ```
85    ///
86    /// ## From a String
87    ///
88    /// ```rust
89    /// use hedl_stream::StreamingParser;
90    /// use std::io::Cursor;
91    ///
92    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
93    /// let data = r#"
94    /// %VERSION: 1.0
95    /// %STRUCT: User: [id, name]
96    /// ---
97    /// users:@User
98    ///   | alice, Alice
99    /// "#;
100    ///
101    /// let parser = StreamingParser::new(Cursor::new(data))?;
102    /// # Ok(())
103    /// # }
104    /// ```
105    ///
106    /// ## From Stdin
107    ///
108    /// ```rust,no_run
109    /// use hedl_stream::StreamingParser;
110    /// use std::io::stdin;
111    ///
112    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
113    /// let parser = StreamingParser::new(stdin().lock())?;
114    /// # Ok(())
115    /// # }
116    /// ```
117    ///
118    /// # Errors
119    ///
120    /// - `StreamError::MissingVersion`: No `%VERSION` directive found
121    /// - `StreamError::InvalidVersion`: Invalid version format
122    /// - `StreamError::Syntax`: Malformed header directive
123    /// - `StreamError::Io`: I/O error reading input
124    pub fn new(reader: R) -> StreamResult<Self> {
125        Self::with_config(reader, StreamingParserConfig::default())
126    }
127
128    /// Create a streaming parser with custom configuration.
129    ///
130    /// Use this when you need to control memory limits, buffer sizes, or enable
131    /// timeout protection for untrusted input.
132    ///
133    /// # Parameters
134    ///
135    /// - `reader`: Any type implementing `Read`
136    /// - `config`: Parser configuration options
137    ///
138    /// # Returns
139    ///
140    /// - `Ok(parser)`: Parser ready to yield events
141    /// - `Err(e)`: Configuration invalid or header parsing failed
142    ///
143    /// # Examples
144    ///
145    /// ## With Timeout Protection
146    ///
147    /// ```rust
148    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
149    /// use std::time::Duration;
150    /// use std::io::Cursor;
151    ///
152    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
153    /// let config = StreamingParserConfig {
154    ///     timeout: Some(Duration::from_secs(30)),
155    ///     ..Default::default()
156    /// };
157    ///
158    /// let untrusted_input = "...";
159    /// let parser = StreamingParser::with_config(
160    ///     Cursor::new(untrusted_input),
161    ///     config
162    /// )?;
163    /// # Ok(())
164    /// # }
165    /// ```
166    ///
167    /// ## For Large Files
168    ///
169    /// ```rust
170    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
171    /// use std::io::Cursor;
172    ///
173    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
174    /// let config = StreamingParserConfig {
175    ///     buffer_size: 256 * 1024,      // 256KB read buffer
176    ///     max_line_length: 10_000_000,  // 10MB max line
177    ///     max_indent_depth: 1000,       // Deep nesting allowed
178    ///     timeout: None,
179    ///     ..Default::default()
180    /// };
181    ///
182    /// let parser = StreamingParser::with_config(
183    ///     Cursor::new("..."),
184    ///     config
185    /// )?;
186    /// # Ok(())
187    /// # }
188    /// ```
189    ///
190    /// ## For Constrained Environments
191    ///
192    /// ```rust
193    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
194    /// use std::time::Duration;
195    /// use std::io::Cursor;
196    ///
197    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
198    /// let config = StreamingParserConfig {
199    ///     buffer_size: 8 * 1024,        // Small 8KB buffer
200    ///     max_line_length: 100_000,     // 100KB max line
201    ///     max_indent_depth: 50,         // Limited nesting
202    ///     timeout: Some(Duration::from_secs(10)),
203    ///     ..Default::default()
204    /// };
205    ///
206    /// let parser = StreamingParser::with_config(
207    ///     Cursor::new("..."),
208    ///     config
209    /// )?;
210    /// # Ok(())
211    /// # }
212    /// ```
213    ///
214    /// # Errors
215    ///
216    /// Same as [`new()`](Self::new), plus:
217    ///
218    /// - `StreamError::Timeout`: Header parsing exceeded configured timeout
219    pub fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
220        // Initialize buffer pool if enabled
221        let buffer_pool = if config.enable_pooling && config.memory_limits.enable_buffer_pooling {
222            Some(BufferPool::new(config.memory_limits.max_pool_size))
223        } else {
224            None
225        };
226
227        let mut parser = Self {
228            reader: LineReader::with_capacity_and_max_length(
229                reader,
230                config.buffer_size,
231                config.max_line_length,
232            ),
233            config,
234            header: None,
235            state: ParserState::default(),
236            finished: false,
237            errored: false,
238            sent_end_of_document: false,
239            start_time: Instant::now(),
240            operations_count: 0,
241            _buffer_pool: buffer_pool,
242        };
243
244        // Parse header immediately
245        parser.parse_header()?;
246
247        Ok(parser)
248    }
249
250    /// Check if timeout has been exceeded.
251    /// This is called periodically during parsing to prevent infinite loops.
252    #[inline]
253    fn check_timeout(&self) -> StreamResult<()> {
254        if let Some(timeout) = self.config.timeout {
255            let elapsed = self.start_time.elapsed();
256            if elapsed > timeout {
257                return Err(StreamError::Timeout {
258                    elapsed,
259                    limit: timeout,
260                });
261            }
262        }
263        Ok(())
264    }
265
266    /// Get the parsed header information.
267    ///
268    /// Returns header metadata including version, schema definitions, aliases,
269    /// and nesting rules. This is available immediately after parser creation.
270    ///
271    /// # Returns
272    ///
273    /// - `Some(&HeaderInfo)`: Header was successfully parsed
274    /// - `None`: Should never happen after successful parser creation
275    ///
276    /// # Examples
277    ///
278    /// ## Inspecting Schema Definitions
279    ///
280    /// ```rust
281    /// use hedl_stream::StreamingParser;
282    /// use std::io::Cursor;
283    ///
284    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
285    /// let input = r#"
286    /// %VERSION: 1.0
287    /// %STRUCT: User: [id, name, email]
288    /// %STRUCT: Order: [id, user_id, amount]
289    /// %ALIAS: active = "Active"
290    /// %NEST: User > Order
291    /// ---
292    /// "#;
293    ///
294    /// let parser = StreamingParser::new(Cursor::new(input))?;
295    /// let header = parser.header().unwrap();
296    ///
297    /// // Check version
298    /// assert_eq!(header.version, (1, 0));
299    ///
300    /// // Get schema
301    /// let user_schema = header.get_schema("User").unwrap();
302    /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
303    ///
304    /// // Check aliases
305    /// assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
306    ///
307    /// // Check nesting rules
308    /// assert!(header.get_child_types("User").map_or(false, |v| v.contains(&"Order".to_string())));
309    /// # Ok(())
310    /// # }
311    /// ```
312    ///
313    /// ## Validating Before Processing
314    ///
315    /// ```rust
316    /// use hedl_stream::StreamingParser;
317    /// use std::io::Cursor;
318    ///
319    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
320    /// let input = r#"
321    /// %VERSION: 1.0
322    /// %STRUCT: User: [id, name]
323    /// ---
324    /// users:@User
325    ///   | alice, Alice
326    /// "#;
327    ///
328    /// let parser = StreamingParser::new(Cursor::new(input))?;
329    ///
330    /// // Validate we have the expected schema before processing
331    /// if let Some(header) = parser.header() {
332    ///     if header.version.0 != 1 {
333    ///         eprintln!("Warning: Unexpected major version");
334    ///     }
335    ///
336    ///     if !header.structs.contains_key("User") {
337    ///         return Err("Missing User schema".into());
338    ///     }
339    /// }
340    ///
341    /// // Proceed with parsing...
342    /// # Ok(())
343    /// # }
344    /// ```
345    pub fn header(&self) -> Option<&HeaderInfo> {
346        self.header.as_ref()
347    }
348
349    /// Parse the header section.
350    fn parse_header(&mut self) -> StreamResult<()> {
351        let mut header = HeaderInfo::new();
352        let mut found_version = false;
353        let mut _found_separator = false;
354
355        while let Some((line_num, line)) = self.reader.next_line()? {
356            // Check timeout every iteration in header parsing
357            self.check_timeout()?;
358
359            let trimmed = line.trim();
360
361            // Skip blank lines and comments
362            if trimmed.is_empty() || trimmed.starts_with('#') {
363                continue;
364            }
365
366            // Check for separator
367            if trimmed == "---" {
368                _found_separator = true;
369                break;
370            }
371
372            // Parse directives
373            if trimmed.starts_with('%') {
374                directives::parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
375            } else {
376                // Not a directive - might be body content without separator
377                self.reader.push_back(line_num, line);
378                break;
379            }
380        }
381
382        if !found_version {
383            return Err(StreamError::MissingVersion);
384        }
385
386        self.header = Some(header);
387        Ok(())
388    }
389
390    /// Parse the next event from the stream.
391    fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
392        // If errored, stop immediately without finalize
393        if self.errored {
394            return Ok(None);
395        }
396
397        // Drain pending events from inline children first
398        if let Some(event) = self.state.pending_events.pop_front() {
399            return Ok(Some(event));
400        }
401
402        // If finished, continue emitting remaining context ends until stack is empty
403        if self.finished {
404            return self.finalize();
405        }
406
407        loop {
408            // Check timeout periodically (every 100 operations to minimize overhead)
409            self.operations_count += 1;
410            if self.operations_count % 100 == 0 {
411                self.check_timeout()?;
412            }
413
414            let (line_num, line) = if let Some(l) = self.reader.next_line()? {
415                l
416            } else {
417                self.finished = true;
418                // Emit any remaining list ends
419                return self.finalize();
420            };
421
422            let trimmed = line.trim();
423
424            // Skip blank lines and comments
425            if trimmed.is_empty() || trimmed.starts_with('#') {
426                continue;
427            }
428
429            // Calculate indentation
430            let indent_info = calculate_indent(&line, line_num as u32)
431                .map_err(|e| StreamError::syntax(line_num, e.to_string()))?;
432
433            let (indent, content) = match indent_info {
434                Some(info) => (info.level, &line[info.spaces..]),
435                None => continue,
436            };
437
438            if indent > self.config.max_indent_depth {
439                return Err(StreamError::syntax(
440                    line_num,
441                    format!("indent depth {indent} exceeds limit"),
442                ));
443            }
444
445            // Pop contexts as needed based on indentation
446            let events = context::pop_contexts(&mut self.state.stack, indent)?;
447            if let Some(event) = events {
448                // Push back the current line to process after emitting list end
449                self.reader.push_back(line_num, line);
450                return Ok(Some(event));
451            }
452
453            // Parse line content
454            return self.parse_line(content, indent, line_num);
455        }
456    }
457
458    fn parse_line(
459        &mut self,
460        content: &str,
461        indent: usize,
462        line_num: usize,
463    ) -> StreamResult<Option<NodeEvent>> {
464        // Check for child block syntax BEFORE stripping comments
465        // because @Type#N: uses # which would otherwise be treated as comment start
466        if content.starts_with('@') && content.contains('#') {
467            // Check if it looks like child block pattern: @Type#N:
468            // We need to pass the original content to preserve the #N: syntax
469            return list_parsing::try_parse_child_block(
470                content,
471                indent,
472                line_num,
473                &self.state.stack,
474                &mut self.state.pending_events,
475                &mut self.state.prev_row,
476                self.header.as_ref(),
477            );
478        }
479
480        // Strip inline comment for all other line types
481        let content = strip_comment(content);
482
483        if let Some(row_content) = content.strip_prefix('|') {
484            // Matrix row
485            let event = list_parsing::parse_matrix_row(
486                row_content,
487                indent,
488                line_num,
489                &mut self.state.stack,
490                &mut self.state.prev_row,
491                self.header.as_ref(),
492            )?;
493            // Update list context after parsing row
494            if let NodeEvent::Node(ref node) = event {
495                context::update_list_context(&mut self.state.stack, &node.type_name, &node.id);
496            }
497            return Ok(Some(event));
498        } else if content.starts_with('@') {
499            // Child block without # - this is an error (requires #N: format)
500            return list_parsing::try_parse_child_block(
501                content,
502                indent,
503                line_num,
504                &self.state.stack,
505                &mut self.state.pending_events,
506                &mut self.state.prev_row,
507                self.header.as_ref(),
508            );
509        }
510
511        if let Some(colon_pos) = content.find(':') {
512            let key = content[..colon_pos].trim();
513            let after_colon = &content[colon_pos + 1..];
514
515            if !is_valid_key_token(key) {
516                return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
517            }
518
519            let after_colon_trimmed = after_colon.trim();
520
521            if after_colon_trimmed.is_empty() {
522                // Object start: validate indent and context
523                context::validate_indent_for_key_value(&self.state.stack, indent, line_num)?;
524
525                self.state.stack.push(Context::Object {
526                    key: key.to_string(),
527                    indent,
528                });
529                Ok(Some(NodeEvent::ObjectStart {
530                    key: key.to_string(),
531                    line: line_num,
532                }))
533            } else if after_colon_trimmed.starts_with('@')
534                && list_parsing::is_list_start(after_colon_trimmed)
535            {
536                // Matrix list start
537                // Accept both "key:@Type" (v2.0 canonical) and "key:@Type" (backward compat)
538
539                // List declarations are allowed in list context (for nested lists)
540                // so we don't call validate_indent_for_key_value here
541
542                let (type_name, schema) = list_parsing::parse_list_start(
543                    after_colon_trimmed,
544                    line_num,
545                    self.header.as_ref(),
546                )?;
547
548                self.state.stack.push(Context::List {
549                    key: key.to_string(),
550                    type_name: type_name.clone(),
551                    schema: schema.clone(),
552                    row_indent: indent + 1,
553                    count: 0,
554                    last_node: None,
555                });
556
557                self.state.prev_row = None;
558
559                Ok(Some(NodeEvent::ListStart {
560                    key: key.to_string(),
561                    type_name,
562                    schema,
563                    line: line_num,
564                }))
565            } else {
566                // Key-value pair: require space after colon and validate indent
567                if !after_colon.starts_with(' ') {
568                    return Err(StreamError::syntax(
569                        line_num,
570                        "space required after ':' in key-value",
571                    ));
572                }
573                context::validate_indent_for_key_value(&self.state.stack, indent, line_num)?;
574
575                let value = value_inference::infer_value(
576                    after_colon.trim(),
577                    line_num,
578                    self.header.as_ref(),
579                )?;
580                Ok(Some(NodeEvent::Scalar {
581                    key: key.to_string(),
582                    value,
583                    line: line_num,
584                }))
585            }
586        } else {
587            Err(StreamError::syntax(line_num, "expected ':' in line"))
588        }
589    }
590
591    fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
592        // If we already sent EndOfDocument, return None to signal true end of stream
593        if self.sent_end_of_document {
594            return Ok(None);
595        }
596
597        // Pop remaining contexts
598        while self.state.stack.len() > 1 {
599            // Safe: loop condition guarantees stack has elements
600            let ctx = self.state.stack.pop().expect("stack has elements");
601            match ctx {
602                Context::List {
603                    key,
604                    type_name,
605                    count,
606                    ..
607                } => {
608                    return Ok(Some(NodeEvent::ListEnd {
609                        key,
610                        type_name,
611                        count,
612                    }));
613                }
614                Context::Object { key, .. } => {
615                    return Ok(Some(NodeEvent::ObjectEnd { key }));
616                }
617                Context::Root => {
618                    // Root context should never be popped
619                }
620            }
621        }
622
623        // Mark that we've sent EndOfDocument, so subsequent calls return None
624        self.sent_end_of_document = true;
625        Ok(Some(NodeEvent::EndOfDocument))
626    }
627}
628
629impl<R: Read> Iterator for StreamingParser<R> {
630    type Item = StreamResult<NodeEvent>;
631
632    fn next(&mut self) -> Option<Self::Item> {
633        match self.next_event() {
634            Ok(Some(NodeEvent::EndOfDocument)) => None,
635            Ok(Some(event)) => Some(Ok(event)),
636            Ok(None) => None,
637            Err(e) => {
638                // Stop iteration after an error to prevent inconsistent state
639                self.finished = true;
640                self.errored = true;
641                Some(Err(e))
642            }
643        }
644    }
645}
646
647// File opening with compression support
648#[cfg(feature = "compression")]
649impl StreamingParser<crate::compression::CompressionReader<std::fs::File>> {
650    /// Open a file with automatic compression detection.
651    ///
652    /// Detects compression format from the file extension (`.gz`, `.zst`, `.lz4`)
653    /// and automatically decompresses the content.
654    ///
655    /// # Examples
656    ///
657    /// ```rust,no_run
658    /// use hedl_stream::StreamingParser;
659    ///
660    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
661    /// // Open a GZIP-compressed HEDL file
662    /// let parser = StreamingParser::open("data.hedl.gz")?;
663    ///
664    /// for event in parser {
665    ///     println!("{:?}", event?);
666    /// }
667    /// # Ok(())
668    /// # }
669    /// ```
670    ///
671    /// # Errors
672    ///
673    /// - `StreamError::Io`: File not found or cannot be opened
674    /// - `StreamError::Compression`: Decompression initialization failed
675    /// - `StreamError::MissingVersion`: Invalid HEDL header
676    pub fn open<P: AsRef<std::path::Path>>(path: P) -> StreamResult<Self> {
677        Self::open_with_config(path, StreamingParserConfig::default())
678    }
679
680    /// Open a file with automatic compression detection and custom configuration.
681    ///
682    /// Combines automatic compression detection with custom parser settings.
683    ///
684    /// # Examples
685    ///
686    /// ```rust,no_run
687    /// use hedl_stream::{StreamingParser, StreamingParserConfig};
688    /// use std::time::Duration;
689    ///
690    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
691    /// let config = StreamingParserConfig {
692    ///     timeout: Some(Duration::from_secs(30)),
693    ///     ..Default::default()
694    /// };
695    ///
696    /// let parser = StreamingParser::open_with_config("data.hedl.zst", config)?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub fn open_with_config<P: AsRef<std::path::Path>>(
701        path: P,
702        config: StreamingParserConfig,
703    ) -> StreamResult<Self> {
704        use crate::compression::{CompressionFormat, CompressionReader};
705
706        let path = path.as_ref();
707        let format = CompressionFormat::from_path(path);
708
709        let file = std::fs::File::open(path).map_err(StreamError::Io)?;
710        let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
711
712        Self::with_config(reader, config)
713    }
714
715    /// Open a file with explicit compression format.
716    ///
717    /// Use this when the file extension doesn't match the actual compression
718    /// format, or when you want to force a specific decompression algorithm.
719    ///
720    /// # Examples
721    ///
722    /// ```rust,no_run
723    /// use hedl_stream::StreamingParser;
724    /// use hedl_stream::compression::CompressionFormat;
725    ///
726    /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
727    /// // File has no extension but is GZIP compressed
728    /// let parser = StreamingParser::open_with_compression(
729    ///     "data.hedl",
730    ///     CompressionFormat::Gzip,
731    /// )?;
732    /// # Ok(())
733    /// # }
734    /// ```
735    pub fn open_with_compression<P: AsRef<std::path::Path>>(
736        path: P,
737        format: crate::compression::CompressionFormat,
738    ) -> StreamResult<Self> {
739        use crate::compression::CompressionReader;
740
741        let file = std::fs::File::open(path).map_err(StreamError::Io)?;
742        let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
743
744        Self::new(reader)
745    }
746}
747
748#[cfg(test)]
749mod tests {
750    use super::*;
751    use std::io::Cursor;
752
753    #[test]
754    fn test_basic_parsing() {
755        let input = r#"
756%VERSION: 1.0
757%STRUCT: User: [id, name]
758---
759users:@User
760 | alice, Alice
761"#;
762        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
763        let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
764        assert!(!events.is_empty());
765    }
766
767    #[test]
768    fn test_object_parsing() {
769        let input = r#"
770%VERSION: 1.0
771---
772config:
773 debug: true
774 timeout: 30
775"#;
776        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
777        let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
778        assert!(!events.is_empty());
779    }
780
781    #[test]
782    fn test_nested_lists() {
783        let input = r#"
784%VERSION: 1.0
785%STRUCT: User: [id, name]
786%STRUCT: Post: [id, title]
787%NEST: User > Post
788---
789users:@User
790 | alice, Alice
791  | post1, First Post
792"#;
793        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
794        let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
795        assert!(!events.is_empty());
796    }
797
798    #[test]
799    fn test_inline_children() {
800        let input = r#"
801%VERSION: 2.0
802%S:User:[id,name]
803%S:Post:[id,title]
804%N:User>Post
805---
806users:@User
807 | alice, Alice
808  @Post#2:|p1,First|p2,Second
809"#;
810        let parser = StreamingParser::new(Cursor::new(input)).unwrap();
811        let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
812        assert!(!events.is_empty());
813    }
814}