hedl_stream/parser/core.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Core streaming parser implementation
19
20use super::config::StreamingParserConfig;
21use super::context::{self, Context, ParserState};
22use super::directives;
23use super::list_parsing;
24use super::value_inference;
25use crate::buffer_pool::BufferPool;
26use crate::error::{StreamError, StreamResult};
27use crate::event::{HeaderInfo, NodeEvent};
28use crate::reader::LineReader;
29use hedl_core::lex::{calculate_indent, is_valid_key_token, strip_comment};
30use std::io::Read;
31use std::time::Instant;
32
33/// Streaming HEDL parser.
34///
35/// Processes HEDL documents incrementally, yielding `NodeEvent` items as they
36/// are parsed without loading the entire document into memory.
37pub struct StreamingParser<R: Read> {
38 reader: LineReader<R>,
39 config: StreamingParserConfig,
40 header: Option<HeaderInfo>,
41 state: ParserState,
42 finished: bool,
43 errored: bool, // Track if an error occurred to skip finalize
44 sent_end_of_document: bool, // Track if EndOfDocument has been returned
45 start_time: Instant,
46 operations_count: usize, // Track operations for periodic timeout checks
47 // Note: Buffer pool integration deferred - requires refactoring of parse_data_row
48 // to support pooled allocations. Current direct allocation pattern is simpler
49 // and performs adequately for typical use cases. Pooling would benefit only
50 // extremely high-throughput scenarios (>1M rows/sec).
51 _buffer_pool: Option<BufferPool>, // Optional buffer pool for high-throughput scenarios (not yet integrated)
52}
53
54impl<R: Read> StreamingParser<R> {
55 /// Create a new streaming parser with default configuration.
56 ///
57 /// The parser immediately reads and validates the HEDL header (version and
58 /// schema directives). If the header is invalid, this function returns an error.
59 ///
60 /// # Parameters
61 ///
62 /// - `reader`: Any type implementing `Read` (files, network streams, buffers, etc.)
63 ///
64 /// # Returns
65 ///
66 /// - `Ok(parser)`: Parser ready to yield events
67 /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
68 ///
69 /// # Examples
70 ///
71 /// ## From a File
72 ///
73 /// ```rust,no_run
74 /// use hedl_stream::StreamingParser;
75 /// use std::fs::File;
76 /// use std::io::BufReader;
77 ///
78 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
79 /// let file = File::open("data.hedl")?;
80 /// let reader = BufReader::new(file);
81 /// let parser = StreamingParser::new(reader)?;
82 /// # Ok(())
83 /// # }
84 /// ```
85 ///
86 /// ## From a String
87 ///
88 /// ```rust
89 /// use hedl_stream::StreamingParser;
90 /// use std::io::Cursor;
91 ///
92 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
93 /// let data = r#"
94 /// %VERSION: 1.0
95 /// %STRUCT: User: [id, name]
96 /// ---
97 /// users:@User
98 /// | alice, Alice
99 /// "#;
100 ///
101 /// let parser = StreamingParser::new(Cursor::new(data))?;
102 /// # Ok(())
103 /// # }
104 /// ```
105 ///
106 /// ## From Stdin
107 ///
108 /// ```rust,no_run
109 /// use hedl_stream::StreamingParser;
110 /// use std::io::stdin;
111 ///
112 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
113 /// let parser = StreamingParser::new(stdin().lock())?;
114 /// # Ok(())
115 /// # }
116 /// ```
117 ///
118 /// # Errors
119 ///
120 /// - `StreamError::MissingVersion`: No `%VERSION` directive found
121 /// - `StreamError::InvalidVersion`: Invalid version format
122 /// - `StreamError::Syntax`: Malformed header directive
123 /// - `StreamError::Io`: I/O error reading input
124 pub fn new(reader: R) -> StreamResult<Self> {
125 Self::with_config(reader, StreamingParserConfig::default())
126 }
127
128 /// Create a streaming parser with custom configuration.
129 ///
130 /// Use this when you need to control memory limits, buffer sizes, or enable
131 /// timeout protection for untrusted input.
132 ///
133 /// # Parameters
134 ///
135 /// - `reader`: Any type implementing `Read`
136 /// - `config`: Parser configuration options
137 ///
138 /// # Returns
139 ///
140 /// - `Ok(parser)`: Parser ready to yield events
141 /// - `Err(e)`: Configuration invalid or header parsing failed
142 ///
143 /// # Examples
144 ///
145 /// ## With Timeout Protection
146 ///
147 /// ```rust
148 /// use hedl_stream::{StreamingParser, StreamingParserConfig};
149 /// use std::time::Duration;
150 /// use std::io::Cursor;
151 ///
152 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
153 /// let config = StreamingParserConfig {
154 /// timeout: Some(Duration::from_secs(30)),
155 /// ..Default::default()
156 /// };
157 ///
158 /// let untrusted_input = "...";
159 /// let parser = StreamingParser::with_config(
160 /// Cursor::new(untrusted_input),
161 /// config
162 /// )?;
163 /// # Ok(())
164 /// # }
165 /// ```
166 ///
167 /// ## For Large Files
168 ///
169 /// ```rust
170 /// use hedl_stream::{StreamingParser, StreamingParserConfig};
171 /// use std::io::Cursor;
172 ///
173 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
174 /// let config = StreamingParserConfig {
175 /// buffer_size: 256 * 1024, // 256KB read buffer
176 /// max_line_length: 10_000_000, // 10MB max line
177 /// max_indent_depth: 1000, // Deep nesting allowed
178 /// timeout: None,
179 /// ..Default::default()
180 /// };
181 ///
182 /// let parser = StreamingParser::with_config(
183 /// Cursor::new("..."),
184 /// config
185 /// )?;
186 /// # Ok(())
187 /// # }
188 /// ```
189 ///
190 /// ## For Constrained Environments
191 ///
192 /// ```rust
193 /// use hedl_stream::{StreamingParser, StreamingParserConfig};
194 /// use std::time::Duration;
195 /// use std::io::Cursor;
196 ///
197 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
198 /// let config = StreamingParserConfig {
199 /// buffer_size: 8 * 1024, // Small 8KB buffer
200 /// max_line_length: 100_000, // 100KB max line
201 /// max_indent_depth: 50, // Limited nesting
202 /// timeout: Some(Duration::from_secs(10)),
203 /// ..Default::default()
204 /// };
205 ///
206 /// let parser = StreamingParser::with_config(
207 /// Cursor::new("..."),
208 /// config
209 /// )?;
210 /// # Ok(())
211 /// # }
212 /// ```
213 ///
214 /// # Errors
215 ///
216 /// Same as [`new()`](Self::new), plus:
217 ///
218 /// - `StreamError::Timeout`: Header parsing exceeded configured timeout
219 pub fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
220 // Initialize buffer pool if enabled
221 let buffer_pool = if config.enable_pooling && config.memory_limits.enable_buffer_pooling {
222 Some(BufferPool::new(config.memory_limits.max_pool_size))
223 } else {
224 None
225 };
226
227 let mut parser = Self {
228 reader: LineReader::with_capacity_and_max_length(
229 reader,
230 config.buffer_size,
231 config.max_line_length,
232 ),
233 config,
234 header: None,
235 state: ParserState::default(),
236 finished: false,
237 errored: false,
238 sent_end_of_document: false,
239 start_time: Instant::now(),
240 operations_count: 0,
241 _buffer_pool: buffer_pool,
242 };
243
244 // Parse header immediately
245 parser.parse_header()?;
246
247 Ok(parser)
248 }
249
250 /// Check if timeout has been exceeded.
251 /// This is called periodically during parsing to prevent infinite loops.
252 #[inline]
253 fn check_timeout(&self) -> StreamResult<()> {
254 if let Some(timeout) = self.config.timeout {
255 let elapsed = self.start_time.elapsed();
256 if elapsed > timeout {
257 return Err(StreamError::Timeout {
258 elapsed,
259 limit: timeout,
260 });
261 }
262 }
263 Ok(())
264 }
265
266 /// Get the parsed header information.
267 ///
268 /// Returns header metadata including version, schema definitions, aliases,
269 /// and nesting rules. This is available immediately after parser creation.
270 ///
271 /// # Returns
272 ///
273 /// - `Some(&HeaderInfo)`: Header was successfully parsed
274 /// - `None`: Should never happen after successful parser creation
275 ///
276 /// # Examples
277 ///
278 /// ## Inspecting Schema Definitions
279 ///
280 /// ```rust
281 /// use hedl_stream::StreamingParser;
282 /// use std::io::Cursor;
283 ///
284 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
285 /// let input = r#"
286 /// %VERSION: 1.0
287 /// %STRUCT: User: [id, name, email]
288 /// %STRUCT: Order: [id, user_id, amount]
289 /// %ALIAS: active = "Active"
290 /// %NEST: User > Order
291 /// ---
292 /// "#;
293 ///
294 /// let parser = StreamingParser::new(Cursor::new(input))?;
295 /// let header = parser.header().unwrap();
296 ///
297 /// // Check version
298 /// assert_eq!(header.version, (1, 0));
299 ///
300 /// // Get schema
301 /// let user_schema = header.get_schema("User").unwrap();
302 /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
303 ///
304 /// // Check aliases
305 /// assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
306 ///
307 /// // Check nesting rules
308 /// assert!(header.get_child_types("User").map_or(false, |v| v.contains(&"Order".to_string())));
309 /// # Ok(())
310 /// # }
311 /// ```
312 ///
313 /// ## Validating Before Processing
314 ///
315 /// ```rust
316 /// use hedl_stream::StreamingParser;
317 /// use std::io::Cursor;
318 ///
319 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
320 /// let input = r#"
321 /// %VERSION: 1.0
322 /// %STRUCT: User: [id, name]
323 /// ---
324 /// users:@User
325 /// | alice, Alice
326 /// "#;
327 ///
328 /// let parser = StreamingParser::new(Cursor::new(input))?;
329 ///
330 /// // Validate we have the expected schema before processing
331 /// if let Some(header) = parser.header() {
332 /// if header.version.0 != 1 {
333 /// eprintln!("Warning: Unexpected major version");
334 /// }
335 ///
336 /// if !header.structs.contains_key("User") {
337 /// return Err("Missing User schema".into());
338 /// }
339 /// }
340 ///
341 /// // Proceed with parsing...
342 /// # Ok(())
343 /// # }
344 /// ```
345 pub fn header(&self) -> Option<&HeaderInfo> {
346 self.header.as_ref()
347 }
348
349 /// Parse the header section.
350 fn parse_header(&mut self) -> StreamResult<()> {
351 let mut header = HeaderInfo::new();
352 let mut found_version = false;
353 let mut _found_separator = false;
354
355 while let Some((line_num, line)) = self.reader.next_line()? {
356 // Check timeout every iteration in header parsing
357 self.check_timeout()?;
358
359 let trimmed = line.trim();
360
361 // Skip blank lines and comments
362 if trimmed.is_empty() || trimmed.starts_with('#') {
363 continue;
364 }
365
366 // Check for separator
367 if trimmed == "---" {
368 _found_separator = true;
369 break;
370 }
371
372 // Parse directives
373 if trimmed.starts_with('%') {
374 directives::parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
375 } else {
376 // Not a directive - might be body content without separator
377 self.reader.push_back(line_num, line);
378 break;
379 }
380 }
381
382 if !found_version {
383 return Err(StreamError::MissingVersion);
384 }
385
386 self.header = Some(header);
387 Ok(())
388 }
389
390 /// Parse the next event from the stream.
391 fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
392 // If errored, stop immediately without finalize
393 if self.errored {
394 return Ok(None);
395 }
396
397 // Drain pending events from inline children first
398 if let Some(event) = self.state.pending_events.pop_front() {
399 return Ok(Some(event));
400 }
401
402 // If finished, continue emitting remaining context ends until stack is empty
403 if self.finished {
404 return self.finalize();
405 }
406
407 loop {
408 // Check timeout periodically (every 100 operations to minimize overhead)
409 self.operations_count += 1;
410 if self.operations_count % 100 == 0 {
411 self.check_timeout()?;
412 }
413
414 let (line_num, line) = if let Some(l) = self.reader.next_line()? {
415 l
416 } else {
417 self.finished = true;
418 // Emit any remaining list ends
419 return self.finalize();
420 };
421
422 let trimmed = line.trim();
423
424 // Skip blank lines and comments
425 if trimmed.is_empty() || trimmed.starts_with('#') {
426 continue;
427 }
428
429 // Calculate indentation
430 let indent_info = calculate_indent(&line, line_num as u32)
431 .map_err(|e| StreamError::syntax(line_num, e.to_string()))?;
432
433 let (indent, content) = match indent_info {
434 Some(info) => (info.level, &line[info.spaces..]),
435 None => continue,
436 };
437
438 if indent > self.config.max_indent_depth {
439 return Err(StreamError::syntax(
440 line_num,
441 format!("indent depth {indent} exceeds limit"),
442 ));
443 }
444
445 // Pop contexts as needed based on indentation
446 let events = context::pop_contexts(&mut self.state.stack, indent)?;
447 if let Some(event) = events {
448 // Push back the current line to process after emitting list end
449 self.reader.push_back(line_num, line);
450 return Ok(Some(event));
451 }
452
453 // Parse line content
454 return self.parse_line(content, indent, line_num);
455 }
456 }
457
458 fn parse_line(
459 &mut self,
460 content: &str,
461 indent: usize,
462 line_num: usize,
463 ) -> StreamResult<Option<NodeEvent>> {
464 // Check for child block syntax BEFORE stripping comments
465 // because @Type#N: uses # which would otherwise be treated as comment start
466 if content.starts_with('@') && content.contains('#') {
467 // Check if it looks like child block pattern: @Type#N:
468 // We need to pass the original content to preserve the #N: syntax
469 return list_parsing::try_parse_child_block(
470 content,
471 indent,
472 line_num,
473 &self.state.stack,
474 &mut self.state.pending_events,
475 &mut self.state.prev_row,
476 self.header.as_ref(),
477 );
478 }
479
480 // Strip inline comment for all other line types
481 let content = strip_comment(content);
482
483 if let Some(row_content) = content.strip_prefix('|') {
484 // Matrix row
485 let event = list_parsing::parse_matrix_row(
486 row_content,
487 indent,
488 line_num,
489 &mut self.state.stack,
490 &mut self.state.prev_row,
491 self.header.as_ref(),
492 )?;
493 // Update list context after parsing row
494 if let NodeEvent::Node(ref node) = event {
495 context::update_list_context(&mut self.state.stack, &node.type_name, &node.id);
496 }
497 return Ok(Some(event));
498 } else if content.starts_with('@') {
499 // Child block without # - this is an error (requires #N: format)
500 return list_parsing::try_parse_child_block(
501 content,
502 indent,
503 line_num,
504 &self.state.stack,
505 &mut self.state.pending_events,
506 &mut self.state.prev_row,
507 self.header.as_ref(),
508 );
509 }
510
511 if let Some(colon_pos) = content.find(':') {
512 let key = content[..colon_pos].trim();
513 let after_colon = &content[colon_pos + 1..];
514
515 if !is_valid_key_token(key) {
516 return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
517 }
518
519 let after_colon_trimmed = after_colon.trim();
520
521 if after_colon_trimmed.is_empty() {
522 // Object start: validate indent and context
523 context::validate_indent_for_key_value(&self.state.stack, indent, line_num)?;
524
525 self.state.stack.push(Context::Object {
526 key: key.to_string(),
527 indent,
528 });
529 Ok(Some(NodeEvent::ObjectStart {
530 key: key.to_string(),
531 line: line_num,
532 }))
533 } else if after_colon_trimmed.starts_with('@')
534 && list_parsing::is_list_start(after_colon_trimmed)
535 {
536 // Matrix list start
537 // Accept both "key:@Type" (v2.0 canonical) and "key:@Type" (backward compat)
538
539 // List declarations are allowed in list context (for nested lists)
540 // so we don't call validate_indent_for_key_value here
541
542 let (type_name, schema) = list_parsing::parse_list_start(
543 after_colon_trimmed,
544 line_num,
545 self.header.as_ref(),
546 )?;
547
548 self.state.stack.push(Context::List {
549 key: key.to_string(),
550 type_name: type_name.clone(),
551 schema: schema.clone(),
552 row_indent: indent + 1,
553 count: 0,
554 last_node: None,
555 });
556
557 self.state.prev_row = None;
558
559 Ok(Some(NodeEvent::ListStart {
560 key: key.to_string(),
561 type_name,
562 schema,
563 line: line_num,
564 }))
565 } else {
566 // Key-value pair: require space after colon and validate indent
567 if !after_colon.starts_with(' ') {
568 return Err(StreamError::syntax(
569 line_num,
570 "space required after ':' in key-value",
571 ));
572 }
573 context::validate_indent_for_key_value(&self.state.stack, indent, line_num)?;
574
575 let value = value_inference::infer_value(
576 after_colon.trim(),
577 line_num,
578 self.header.as_ref(),
579 )?;
580 Ok(Some(NodeEvent::Scalar {
581 key: key.to_string(),
582 value,
583 line: line_num,
584 }))
585 }
586 } else {
587 Err(StreamError::syntax(line_num, "expected ':' in line"))
588 }
589 }
590
591 fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
592 // If we already sent EndOfDocument, return None to signal true end of stream
593 if self.sent_end_of_document {
594 return Ok(None);
595 }
596
597 // Pop remaining contexts
598 while self.state.stack.len() > 1 {
599 // Safe: loop condition guarantees stack has elements
600 let ctx = self.state.stack.pop().expect("stack has elements");
601 match ctx {
602 Context::List {
603 key,
604 type_name,
605 count,
606 ..
607 } => {
608 return Ok(Some(NodeEvent::ListEnd {
609 key,
610 type_name,
611 count,
612 }));
613 }
614 Context::Object { key, .. } => {
615 return Ok(Some(NodeEvent::ObjectEnd { key }));
616 }
617 Context::Root => {
618 // Root context should never be popped
619 }
620 }
621 }
622
623 // Mark that we've sent EndOfDocument, so subsequent calls return None
624 self.sent_end_of_document = true;
625 Ok(Some(NodeEvent::EndOfDocument))
626 }
627}
628
629impl<R: Read> Iterator for StreamingParser<R> {
630 type Item = StreamResult<NodeEvent>;
631
632 fn next(&mut self) -> Option<Self::Item> {
633 match self.next_event() {
634 Ok(Some(NodeEvent::EndOfDocument)) => None,
635 Ok(Some(event)) => Some(Ok(event)),
636 Ok(None) => None,
637 Err(e) => {
638 // Stop iteration after an error to prevent inconsistent state
639 self.finished = true;
640 self.errored = true;
641 Some(Err(e))
642 }
643 }
644 }
645}
646
647// File opening with compression support
648#[cfg(feature = "compression")]
649impl StreamingParser<crate::compression::CompressionReader<std::fs::File>> {
650 /// Open a file with automatic compression detection.
651 ///
652 /// Detects compression format from the file extension (`.gz`, `.zst`, `.lz4`)
653 /// and automatically decompresses the content.
654 ///
655 /// # Examples
656 ///
657 /// ```rust,no_run
658 /// use hedl_stream::StreamingParser;
659 ///
660 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
661 /// // Open a GZIP-compressed HEDL file
662 /// let parser = StreamingParser::open("data.hedl.gz")?;
663 ///
664 /// for event in parser {
665 /// println!("{:?}", event?);
666 /// }
667 /// # Ok(())
668 /// # }
669 /// ```
670 ///
671 /// # Errors
672 ///
673 /// - `StreamError::Io`: File not found or cannot be opened
674 /// - `StreamError::Compression`: Decompression initialization failed
675 /// - `StreamError::MissingVersion`: Invalid HEDL header
676 pub fn open<P: AsRef<std::path::Path>>(path: P) -> StreamResult<Self> {
677 Self::open_with_config(path, StreamingParserConfig::default())
678 }
679
680 /// Open a file with automatic compression detection and custom configuration.
681 ///
682 /// Combines automatic compression detection with custom parser settings.
683 ///
684 /// # Examples
685 ///
686 /// ```rust,no_run
687 /// use hedl_stream::{StreamingParser, StreamingParserConfig};
688 /// use std::time::Duration;
689 ///
690 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
691 /// let config = StreamingParserConfig {
692 /// timeout: Some(Duration::from_secs(30)),
693 /// ..Default::default()
694 /// };
695 ///
696 /// let parser = StreamingParser::open_with_config("data.hedl.zst", config)?;
697 /// # Ok(())
698 /// # }
699 /// ```
700 pub fn open_with_config<P: AsRef<std::path::Path>>(
701 path: P,
702 config: StreamingParserConfig,
703 ) -> StreamResult<Self> {
704 use crate::compression::{CompressionFormat, CompressionReader};
705
706 let path = path.as_ref();
707 let format = CompressionFormat::from_path(path);
708
709 let file = std::fs::File::open(path).map_err(StreamError::Io)?;
710 let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
711
712 Self::with_config(reader, config)
713 }
714
715 /// Open a file with explicit compression format.
716 ///
717 /// Use this when the file extension doesn't match the actual compression
718 /// format, or when you want to force a specific decompression algorithm.
719 ///
720 /// # Examples
721 ///
722 /// ```rust,no_run
723 /// use hedl_stream::StreamingParser;
724 /// use hedl_stream::compression::CompressionFormat;
725 ///
726 /// # fn example() -> Result<(), Box<dyn std::error::Error>> {
727 /// // File has no extension but is GZIP compressed
728 /// let parser = StreamingParser::open_with_compression(
729 /// "data.hedl",
730 /// CompressionFormat::Gzip,
731 /// )?;
732 /// # Ok(())
733 /// # }
734 /// ```
735 pub fn open_with_compression<P: AsRef<std::path::Path>>(
736 path: P,
737 format: crate::compression::CompressionFormat,
738 ) -> StreamResult<Self> {
739 use crate::compression::CompressionReader;
740
741 let file = std::fs::File::open(path).map_err(StreamError::Io)?;
742 let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
743
744 Self::new(reader)
745 }
746}
747
748#[cfg(test)]
749mod tests {
750 use super::*;
751 use std::io::Cursor;
752
753 #[test]
754 fn test_basic_parsing() {
755 let input = r#"
756%VERSION: 1.0
757%STRUCT: User: [id, name]
758---
759users:@User
760 | alice, Alice
761"#;
762 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
763 let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
764 assert!(!events.is_empty());
765 }
766
767 #[test]
768 fn test_object_parsing() {
769 let input = r#"
770%VERSION: 1.0
771---
772config:
773 debug: true
774 timeout: 30
775"#;
776 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
777 let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
778 assert!(!events.is_empty());
779 }
780
781 #[test]
782 fn test_nested_lists() {
783 let input = r#"
784%VERSION: 1.0
785%STRUCT: User: [id, name]
786%STRUCT: Post: [id, title]
787%NEST: User > Post
788---
789users:@User
790 | alice, Alice
791 | post1, First Post
792"#;
793 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
794 let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
795 assert!(!events.is_empty());
796 }
797
798 #[test]
799 fn test_inline_children() {
800 let input = r#"
801%VERSION: 2.0
802%S:User:[id,name]
803%S:Post:[id,title]
804%N:User>Post
805---
806users:@User
807 | alice, Alice
808 @Post#2:|p1,First|p2,Second
809"#;
810 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
811 let events: Vec<_> = parser.collect::<Result<Vec<_>, _>>().unwrap();
812 assert!(!events.is_empty());
813 }
814}