hedl_stream/
async_parser.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async streaming parser implementation.
19//!
20//! This module provides an asynchronous streaming parser for HEDL documents that mirrors
21//! the synchronous [`StreamingParser`](crate::StreamingParser) but uses tokio's async I/O.
22//!
23//! # When to Use Async
24//!
25//! **Choose Async (`AsyncStreamingParser`) when:**
26//! - Parsing network streams or remote data sources
27//! - High-concurrency scenarios (thousands of concurrent parsers)
28//! - Integration with async web frameworks (axum, actix-web, etc.)
29//! - Need to parse multiple streams concurrently
30//! - Working in an async runtime context
31//!
32//! **Choose Sync (`StreamingParser`) when:**
33//! - Parsing local files
34//! - Single-threaded batch processing
35//! - Simpler synchronous code is preferred
36//! - Performance is critical and no I/O waiting occurs
37//!
38//! # Performance Characteristics
39//!
40//! - **Non-blocking I/O**: Yields to runtime when waiting for data
41//! - **Same Memory Profile**: Identical to sync parser (~constant memory)
42//! - **Concurrent Processing**: Can process many streams simultaneously
43//! - **Zero-Copy**: Minimal allocations, same as sync version
44//!
45//! # Examples
46//!
47//! ## Basic Async Streaming
48//!
49//! ```rust,no_run
50//! # #[cfg(feature = "async")]
51//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
53//! use tokio::fs::File;
54//!
55//! let file = File::open("large-dataset.hedl").await?;
56//! let mut parser = AsyncStreamingParser::new(file).await?;
57//!
58//! while let Some(event) = parser.next_event().await? {
59//!     match event {
60//!         NodeEvent::Node(node) => {
61//!             println!("{}:{}", node.type_name, node.id);
62//!         }
63//!         NodeEvent::ListStart { type_name, .. } => {
64//!             println!("List started: {}", type_name);
65//!         }
66//!         _ => {}
67//!     }
68//! }
69//! # Ok(())
70//! # }
71//! ```
72//!
73//! ## Concurrent Processing
74//!
75//! ```rust,no_run
76//! # #[cfg(feature = "async")]
77//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
79//! use tokio::fs::File;
80//!
81//! async fn process_file(path: &str) -> Result<usize, Box<dyn std::error::Error>> {
82//!     let file = File::open(path).await?;
83//!     let mut parser = AsyncStreamingParser::new(file).await?;
84//!
85//!     let mut count = 0;
86//!     while let Some(event) = parser.next_event().await? {
87//!         if let NodeEvent::Node(_) = event {
88//!             count += 1;
89//!         }
90//!     }
91//!     Ok(count)
92//! }
93//!
94//! // Process multiple files concurrently
95//! let results = tokio::join!(
96//!     process_file("file1.hedl"),
97//!     process_file("file2.hedl"),
98//!     process_file("file3.hedl"),
99//! );
100//! # Ok(())
101//! # }
102//! ```
103
104use crate::async_reader::AsyncLineReader;
105use crate::error::{StreamError, StreamResult};
106use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
107use crate::parser::{strip_comment, StreamingParserConfig};
108use hedl_core::lex::{calculate_indent, is_valid_key_token, is_valid_type_name};
109use hedl_core::Value;
110use std::future::Future;
111use std::pin::Pin;
112use std::task::{Context as TaskContext, Poll};
113use std::time::Instant;
114use tokio::io::AsyncRead;
115
116/// Type alias for list context lookup result: (`type_name`, schema, optional `last_node` info)
117type ListContextResult = (String, Vec<String>, Option<(String, String)>);
118
119/// Async streaming HEDL parser.
120///
121/// Processes HEDL documents asynchronously, yielding `NodeEvent` items as they
122/// are parsed without loading the entire document into memory. Uses tokio's
123/// async I/O for non-blocking operation.
124///
125/// # Memory Characteristics
126///
127/// - **Header**: Parsed once at initialization and kept in memory
128/// - **Per-Line**: Only current line and parsing context (stack depth proportional to nesting)
129/// - **No Buffering**: Nodes are yielded immediately after parsing
130/// - **Identical to Sync**: Same memory profile as synchronous parser
131///
132/// # Examples
133///
134/// ## Parse from Async File
135///
136/// ```rust,no_run
137/// # #[cfg(feature = "async")]
138/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
139/// use hedl_stream::{AsyncStreamingParser, NodeEvent};
140/// use tokio::fs::File;
141///
142/// let file = File::open("data.hedl").await?;
143/// let mut parser = AsyncStreamingParser::new(file).await?;
144///
145/// while let Some(event) = parser.next_event().await? {
146///     if let NodeEvent::Node(node) = event {
147///         println!("Processing {}: {}", node.type_name, node.id);
148///     }
149/// }
150/// # Ok(())
151/// # }
152/// ```
153///
154/// ## With Timeout Protection
155///
156/// ```rust,no_run
157/// # #[cfg(feature = "async")]
158/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
159/// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig, StreamError};
160/// use std::time::Duration;
161/// use std::io::Cursor;
162///
163/// let config = StreamingParserConfig {
164///     timeout: Some(Duration::from_secs(10)),
165///     ..Default::default()
166/// };
167///
168/// let mut parser = AsyncStreamingParser::with_config(
169///     Cursor::new("untrusted input"),
170///     config
171/// ).await?;
172///
173/// while let Some(event) = parser.next_event().await? {
174///     // Process event
175/// }
176/// # Ok(())
177/// # }
178/// ```
179pub struct AsyncStreamingParser<R: AsyncRead + Unpin> {
180    reader: AsyncLineReader<R>,
181    config: StreamingParserConfig,
182    header: Option<HeaderInfo>,
183    state: ParserState,
184    finished: bool,
185    errored: bool,              // Track if an error occurred to skip finalize
186    sent_end_of_document: bool, // Track if EndOfDocument has been returned
187    start_time: Instant,
188    operations_count: usize,
189}
190
191#[derive(Debug)]
192struct ParserState {
193    /// Stack of active contexts.
194    stack: Vec<Context>,
195    /// Previous row values for ditto handling.
196    prev_row: Option<Vec<Value>>,
197}
198
199#[derive(Debug, Clone)]
200enum Context {
201    Root,
202    Object {
203        #[allow(dead_code)]
204        key: String,
205        indent: usize,
206    },
207    List {
208        key: String,
209        type_name: String,
210        schema: Vec<String>,
211        row_indent: usize,
212        count: usize,
213        last_node: Option<(String, String)>, // (type, id)
214    },
215}
216
217impl<R: AsyncRead + Unpin> AsyncStreamingParser<R> {
218    /// Create a new async streaming parser with default configuration.
219    ///
220    /// The parser immediately reads and validates the HEDL header (version and
221    /// schema directives). If the header is invalid, this function returns an error.
222    ///
223    /// # Parameters
224    ///
225    /// - `reader`: Any type implementing `AsyncRead + Unpin`
226    ///
227    /// # Returns
228    ///
229    /// - `Ok(parser)`: Parser ready to yield events
230    /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
231    ///
232    /// # Examples
233    ///
234    /// ## From a File
235    ///
236    /// ```rust,no_run
237    /// # #[cfg(feature = "async")]
238    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
239    /// use hedl_stream::AsyncStreamingParser;
240    /// use tokio::fs::File;
241    ///
242    /// let file = File::open("data.hedl").await?;
243    /// let parser = AsyncStreamingParser::new(file).await?;
244    /// # Ok(())
245    /// # }
246    /// ```
247    ///
248    /// ## From a String
249    ///
250    /// ```rust
251    /// # #[cfg(feature = "async")]
252    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
253    /// use hedl_stream::AsyncStreamingParser;
254    /// use std::io::Cursor;
255    ///
256    /// let data = r#"
257    /// %VERSION: 1.0
258    /// %STRUCT: User: [id, name]
259    /// ---
260    /// users: @User
261    ///   | alice, Alice
262    /// "#;
263    ///
264    /// let parser = AsyncStreamingParser::new(Cursor::new(data)).await?;
265    /// # Ok(())
266    /// # }
267    /// ```
268    ///
269    /// # Errors
270    ///
271    /// - `StreamError::MissingVersion`: No `%VERSION` directive found
272    /// - `StreamError::InvalidVersion`: Invalid version format
273    /// - `StreamError::Syntax`: Malformed header directive
274    /// - `StreamError::Io`: I/O error reading input
275    pub async fn new(reader: R) -> StreamResult<Self> {
276        Self::with_config(reader, StreamingParserConfig::default()).await
277    }
278
279    /// Create an async streaming parser with custom configuration.
280    ///
281    /// Use this when you need to control memory limits, buffer sizes, or enable
282    /// timeout protection for untrusted input.
283    ///
284    /// # Examples
285    ///
286    /// ## With Timeout
287    ///
288    /// ```rust
289    /// # #[cfg(feature = "async")]
290    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
291    /// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig};
292    /// use std::time::Duration;
293    /// use std::io::Cursor;
294    ///
295    /// let config = StreamingParserConfig {
296    ///     timeout: Some(Duration::from_secs(30)),
297    ///     ..Default::default()
298    /// };
299    ///
300    /// let parser = AsyncStreamingParser::with_config(
301    ///     Cursor::new("untrusted input"),
302    ///     config
303    /// ).await?;
304    /// # Ok(())
305    /// # }
306    /// ```
307    pub async fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
308        let mut parser = Self {
309            reader: AsyncLineReader::with_capacity(reader, config.buffer_size),
310            config,
311            header: None,
312            state: ParserState {
313                stack: vec![Context::Root],
314                prev_row: None,
315            },
316            finished: false,
317            errored: false,
318            sent_end_of_document: false,
319            start_time: Instant::now(),
320            operations_count: 0,
321        };
322
323        // Parse header immediately
324        parser.parse_header().await?;
325
326        Ok(parser)
327    }
328
329    /// Check if timeout has been exceeded.
330    #[inline]
331    fn check_timeout(&self) -> StreamResult<()> {
332        if let Some(timeout) = self.config.timeout {
333            let elapsed = self.start_time.elapsed();
334            if elapsed > timeout {
335                return Err(StreamError::Timeout {
336                    elapsed,
337                    limit: timeout,
338                });
339            }
340        }
341        Ok(())
342    }
343
344    /// Set the errored flag and return an error.
345    ///
346    /// This helper ensures that after any error is returned, subsequent calls
347    /// to `next_event` will return `Ok(None)` without attempting further parsing.
348    #[inline]
349    fn return_error<T>(&mut self, e: StreamError) -> StreamResult<T> {
350        self.finished = true;
351        self.errored = true;
352        Err(e)
353    }
354
355    /// Get the parsed header information.
356    ///
357    /// Returns header metadata including version, schema definitions, aliases,
358    /// and nesting rules. This is available immediately after parser creation.
359    ///
360    /// # Examples
361    ///
362    /// ```rust
363    /// # #[cfg(feature = "async")]
364    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
365    /// use hedl_stream::AsyncStreamingParser;
366    /// use std::io::Cursor;
367    ///
368    /// let input = r#"
369    /// %VERSION: 1.0
370    /// %STRUCT: User: [id, name, email]
371    /// ---
372    /// "#;
373    ///
374    /// let parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
375    /// let header = parser.header().unwrap();
376    ///
377    /// assert_eq!(header.version, (1, 0));
378    /// let user_schema = header.get_schema("User").unwrap();
379    /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
380    /// # Ok(())
381    /// # }
382    /// ```
383    pub fn header(&self) -> Option<&HeaderInfo> {
384        self.header.as_ref()
385    }
386
387    /// Parse the next event from the stream asynchronously.
388    ///
389    /// Returns `Ok(Some(event))` if an event was parsed, `Ok(None)` at end of document,
390    /// or `Err` on parsing errors.
391    ///
392    /// # Performance
393    ///
394    /// This method is async and will yield to the tokio runtime when waiting for I/O,
395    /// allowing other tasks to run. It does not block the thread.
396    ///
397    /// # Examples
398    ///
399    /// ```rust
400    /// # #[cfg(feature = "async")]
401    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
402    /// use hedl_stream::{AsyncStreamingParser, NodeEvent};
403    /// use std::io::Cursor;
404    ///
405    /// let input = r#"
406    /// %VERSION: 1.0
407    /// %STRUCT: User: [id, name]
408    /// ---
409    /// users: @User
410    ///   | alice, Alice
411    /// "#;
412    ///
413    /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
414    ///
415    /// while let Some(event) = parser.next_event().await? {
416    ///     match event {
417    ///         NodeEvent::Node(node) => println!("Node: {}", node.id),
418    ///         NodeEvent::ListStart { type_name, .. } => println!("List: {}", type_name),
419    ///         _ => {}
420    ///     }
421    /// }
422    /// # Ok(())
423    /// # }
424    /// ```
425    pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
426        // If errored, stop immediately without finalize
427        if self.errored {
428            return Ok(None);
429        }
430        // If finished, continue emitting remaining context ends until stack is empty
431        if self.finished {
432            return self.finalize();
433        }
434
435        loop {
436            // Check timeout periodically (every 100 operations to minimize overhead)
437            self.operations_count += 1;
438            if self.operations_count % 100 == 0 {
439                if let Err(e) = self.check_timeout() {
440                    return self.return_error(e);
441                }
442            }
443
444            let (line_num, line) = match self.reader.next_line().await {
445                Ok(Some(l)) => l,
446                Ok(None) => {
447                    self.finished = true;
448                    return self.finalize();
449                }
450                Err(e) => return self.return_error(e),
451            };
452
453            let trimmed = line.trim();
454
455            // Skip blank lines and comments
456            if trimmed.is_empty() || trimmed.starts_with('#') {
457                continue;
458            }
459
460            // Calculate indentation
461            let indent_info = match calculate_indent(&line, line_num as u32) {
462                Ok(info) => info,
463                Err(e) => return self.return_error(StreamError::syntax(line_num, e.to_string())),
464            };
465
466            let (indent, content) = match indent_info {
467                Some(info) => (info.level, &line[info.spaces..]),
468                None => continue,
469            };
470
471            if indent > self.config.max_indent_depth {
472                return self.return_error(StreamError::syntax(
473                    line_num,
474                    format!("indent depth {indent} exceeds limit"),
475                ));
476            }
477
478            // Pop contexts as needed based on indentation
479            let events = match self.pop_contexts(indent) {
480                Ok(e) => e,
481                Err(e) => return self.return_error(e),
482            };
483            if let Some(event) = events {
484                // Push back the current line to process after emitting list end
485                self.reader.push_back(line_num, line);
486                return Ok(Some(event));
487            }
488
489            // Parse line content
490            return match self.parse_line(content, indent, line_num) {
491                Ok(result) => Ok(result),
492                Err(e) => self.return_error(e),
493            };
494        }
495    }
496
497    async fn parse_header(&mut self) -> StreamResult<()> {
498        let mut header = HeaderInfo::new();
499        let mut found_version = false;
500        let mut _found_separator = false;
501
502        while let Some((line_num, line)) = self.reader.next_line().await? {
503            self.check_timeout()?;
504
505            let trimmed = line.trim();
506
507            if trimmed.is_empty() || trimmed.starts_with('#') {
508                continue;
509            }
510
511            if trimmed == "---" {
512                _found_separator = true;
513                break;
514            }
515
516            if trimmed.starts_with('%') {
517                self.parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
518            } else {
519                self.reader.push_back(line_num, line);
520                break;
521            }
522        }
523
524        if !found_version {
525            return Err(StreamError::MissingVersion);
526        }
527
528        self.header = Some(header);
529        Ok(())
530    }
531
532    fn parse_directive(
533        &self,
534        line: &str,
535        line_num: usize,
536        header: &mut HeaderInfo,
537        found_version: &mut bool,
538    ) -> StreamResult<()> {
539        if line.starts_with("%VERSION") {
540            self.parse_version_directive(line, header, found_version)
541        } else if line.starts_with("%STRUCT") {
542            self.parse_struct_directive(line, line_num, header)
543        } else if line.starts_with("%ALIAS") {
544            self.parse_alias_directive(line, line_num, header)
545        } else if line.starts_with("%NEST") {
546            self.parse_nest_directive(line, line_num, header)
547        } else {
548            Ok(())
549        }
550    }
551
552    /// Strip inline comments from a directive line.
553    ///
554    /// Handles `#` characters outside of quoted strings and brackets.
555    /// Returns the content before the first unquoted/unbracketed `#`.
556    fn strip_inline_comment(text: &str) -> &str {
557        let mut in_quotes = false;
558        let mut in_brackets = 0;
559        let mut quote_char = '"';
560
561        for (i, c) in text.char_indices() {
562            match c {
563                '"' | '\'' if !in_quotes => {
564                    in_quotes = true;
565                    quote_char = c;
566                }
567                c if in_quotes && c == quote_char => {
568                    in_quotes = false;
569                }
570                '[' if !in_quotes => in_brackets += 1,
571                ']' if !in_quotes && in_brackets > 0 => in_brackets -= 1,
572                '#' if !in_quotes && in_brackets == 0 => {
573                    return text[..i].trim_end();
574                }
575                _ => {}
576            }
577        }
578        text
579    }
580
581    fn parse_version_directive(
582        &self,
583        line: &str,
584        header: &mut HeaderInfo,
585        found_version: &mut bool,
586    ) -> StreamResult<()> {
587        // Strip inline comments first
588        let line = Self::strip_inline_comment(line);
589        let rest = line.strip_prefix("%VERSION").expect("prefix exists").trim();
590        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
591        let parts: Vec<&str> = rest.split('.').collect();
592
593        if parts.len() != 2 {
594            return Err(StreamError::InvalidVersion(rest.to_string()));
595        }
596
597        let major: u32 = parts[0]
598            .parse()
599            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
600        let minor: u32 = parts[1]
601            .parse()
602            .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
603
604        header.version = (major, minor);
605        *found_version = true;
606        Ok(())
607    }
608
609    fn parse_struct_directive(
610        &self,
611        line: &str,
612        line_num: usize,
613        header: &mut HeaderInfo,
614    ) -> StreamResult<()> {
615        // Strip inline comments first
616        let line = Self::strip_inline_comment(line);
617        let rest = line.strip_prefix("%STRUCT").expect("prefix exists").trim();
618        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
619
620        let bracket_start = rest
621            .find('[')
622            .ok_or_else(|| StreamError::syntax(line_num, "missing '[' in %STRUCT"))?;
623        let bracket_end = rest
624            .find(']')
625            .ok_or_else(|| StreamError::syntax(line_num, "missing ']' in %STRUCT"))?;
626
627        let type_part = rest[..bracket_start].trim().trim_end_matches(':').trim();
628        let type_name = if let Some(paren_pos) = type_part.find('(') {
629            type_part[..paren_pos].trim()
630        } else {
631            type_part
632        };
633        if !is_valid_type_name(type_name) {
634            return Err(StreamError::syntax(
635                line_num,
636                format!("invalid type name: {type_name}"),
637            ));
638        }
639
640        let cols_str = &rest[bracket_start + 1..bracket_end];
641        let columns: Vec<String> = cols_str
642            .split(',')
643            .map(|s| s.trim().to_string())
644            .filter(|s| !s.is_empty())
645            .collect();
646
647        if columns.is_empty() {
648            return Err(StreamError::syntax(line_num, "empty schema"));
649        }
650
651        header.structs.insert(type_name.to_string(), columns);
652        Ok(())
653    }
654
655    fn parse_alias_directive(
656        &self,
657        line: &str,
658        line_num: usize,
659        header: &mut HeaderInfo,
660    ) -> StreamResult<()> {
661        // Strip inline comments first
662        let line = Self::strip_inline_comment(line);
663        let rest = line.strip_prefix("%ALIAS").expect("prefix exists").trim();
664        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
665
666        let sep_pos = rest
667            .find('=')
668            .or_else(|| rest.find(':'))
669            .ok_or_else(|| StreamError::syntax(line_num, "missing '=' or ':' in %ALIAS"))?;
670
671        let alias = rest[..sep_pos].trim();
672        let value = rest[sep_pos + 1..].trim().trim_matches('"');
673
674        header.aliases.insert(alias.to_string(), value.to_string());
675        Ok(())
676    }
677
678    fn parse_nest_directive(
679        &self,
680        line: &str,
681        line_num: usize,
682        header: &mut HeaderInfo,
683    ) -> StreamResult<()> {
684        // Strip inline comments first
685        let line = Self::strip_inline_comment(line);
686        let rest = line.strip_prefix("%NEST").expect("prefix exists").trim();
687        let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
688
689        let arrow_pos = rest
690            .find('>')
691            .ok_or_else(|| StreamError::syntax(line_num, "missing '>' in %NEST"))?;
692
693        let parent = rest[..arrow_pos].trim();
694        let child = rest[arrow_pos + 1..].trim();
695
696        if !is_valid_type_name(parent) || !is_valid_type_name(child) {
697            return Err(StreamError::syntax(line_num, "invalid type name in %NEST"));
698        }
699
700        header.nests.insert(parent.to_string(), child.to_string());
701        Ok(())
702    }
703
704    fn pop_contexts(&mut self, current_indent: usize) -> StreamResult<Option<NodeEvent>> {
705        while self.state.stack.len() > 1 {
706            let should_pop = match self.state.stack.last().expect("stack has elements") {
707                Context::Root => false,
708                Context::Object { indent, .. } => current_indent <= *indent,
709                Context::List { row_indent, .. } => current_indent < *row_indent,
710            };
711
712            if should_pop {
713                let ctx = self.state.stack.pop().expect("stack has elements");
714                match ctx {
715                    Context::List {
716                        key,
717                        type_name,
718                        count,
719                        ..
720                    } => {
721                        return Ok(Some(NodeEvent::ListEnd {
722                            key,
723                            type_name,
724                            count,
725                        }));
726                    }
727                    Context::Object { key, .. } => {
728                        return Ok(Some(NodeEvent::ObjectEnd { key }));
729                    }
730                    Context::Root => {
731                        // Root context should never be popped
732                    }
733                }
734            } else {
735                break;
736            }
737        }
738
739        Ok(None)
740    }
741
742    fn parse_line(
743        &mut self,
744        content: &str,
745        indent: usize,
746        line_num: usize,
747    ) -> StreamResult<Option<NodeEvent>> {
748        let content = strip_comment(content);
749
750        if let Some(row_content) = content.strip_prefix('|') {
751            self.parse_matrix_row(row_content, indent, line_num)
752        } else if let Some(colon_pos) = content.find(':') {
753            let key = content[..colon_pos].trim();
754            let after_colon = &content[colon_pos + 1..];
755
756            if !is_valid_key_token(key) {
757                return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
758            }
759
760            let after_colon_trimmed = after_colon.trim();
761
762            if after_colon_trimmed.is_empty() {
763                self.state.stack.push(Context::Object {
764                    key: key.to_string(),
765                    indent,
766                });
767                Ok(Some(NodeEvent::ObjectStart {
768                    key: key.to_string(),
769                    line: line_num,
770                }))
771            } else if after_colon_trimmed.starts_with('@')
772                && self.is_list_start(after_colon_trimmed)
773            {
774                let (type_name, schema) = self.parse_list_start(after_colon_trimmed, line_num)?;
775
776                self.state.stack.push(Context::List {
777                    key: key.to_string(),
778                    type_name: type_name.clone(),
779                    schema: schema.clone(),
780                    row_indent: indent + 1,
781                    count: 0,
782                    last_node: None,
783                });
784
785                self.state.prev_row = None;
786
787                Ok(Some(NodeEvent::ListStart {
788                    key: key.to_string(),
789                    type_name,
790                    schema,
791                    line: line_num,
792                }))
793            } else {
794                let value = self.infer_value(after_colon.trim(), line_num)?;
795                Ok(Some(NodeEvent::Scalar {
796                    key: key.to_string(),
797                    value,
798                    line: line_num,
799                }))
800            }
801        } else {
802            Err(StreamError::syntax(line_num, "expected ':' in line"))
803        }
804    }
805
806    #[inline]
807    fn is_list_start(&self, s: &str) -> bool {
808        let s = s.trim();
809        if !s.starts_with('@') {
810            return false;
811        }
812        let rest = &s[1..];
813        let type_end = rest
814            .find(|c: char| c == '[' || c.is_whitespace())
815            .unwrap_or(rest.len());
816        let type_name = &rest[..type_end];
817        is_valid_type_name(type_name)
818    }
819
820    fn parse_list_start(&self, s: &str, line_num: usize) -> StreamResult<(String, Vec<String>)> {
821        let s = s.trim();
822        let rest = &s[1..];
823
824        if let Some(bracket_pos) = rest.find('[') {
825            let type_name = &rest[..bracket_pos];
826            if !is_valid_type_name(type_name) {
827                return Err(StreamError::syntax(
828                    line_num,
829                    format!("invalid type name: {type_name}"),
830                ));
831            }
832
833            let bracket_end = rest
834                .find(']')
835                .ok_or_else(|| StreamError::syntax(line_num, "missing ']'"))?;
836
837            let cols_str = &rest[bracket_pos + 1..bracket_end];
838            let columns: Vec<String> = cols_str
839                .split(',')
840                .map(|s| s.trim().to_string())
841                .filter(|s| !s.is_empty())
842                .collect();
843
844            Ok((type_name.to_string(), columns))
845        } else {
846            let type_name = rest.trim();
847            if !is_valid_type_name(type_name) {
848                return Err(StreamError::syntax(
849                    line_num,
850                    format!("invalid type name: {type_name}"),
851                ));
852            }
853
854            let header = self
855                .header
856                .as_ref()
857                .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
858
859            let schema = header.structs.get(type_name).ok_or_else(|| {
860                StreamError::schema(line_num, format!("undefined type: {type_name}"))
861            })?;
862
863            Ok((type_name.to_string(), schema.clone()))
864        }
865    }
866
867    fn parse_matrix_row(
868        &mut self,
869        content: &str,
870        indent: usize,
871        line_num: usize,
872    ) -> StreamResult<Option<NodeEvent>> {
873        let content = strip_comment(content).trim();
874
875        let (type_name, schema, parent_info) = self.find_list_context(indent, line_num)?;
876
877        let fields = hedl_core::lex::parse_csv_row(content)
878            .map_err(|e| StreamError::syntax(line_num, format!("row parse error: {e}")))?;
879
880        if fields.len() != schema.len() {
881            return Err(StreamError::ShapeMismatch {
882                line: line_num,
883                expected: schema.len(),
884                got: fields.len(),
885            });
886        }
887
888        let mut values = Vec::with_capacity(fields.len());
889        for (col_idx, field) in fields.iter().enumerate() {
890            let value = if field.value == "^" {
891                self.state
892                    .prev_row
893                    .as_ref()
894                    .and_then(|prev| prev.get(col_idx).cloned())
895                    .unwrap_or(Value::Null)
896            } else if field.is_quoted {
897                Value::String(field.value.clone().into())
898            } else {
899                self.infer_value(&field.value, line_num)?
900            };
901            values.push(value);
902        }
903
904        let id = match &values[0] {
905            Value::String(s) => s.clone(),
906            _ => return Err(StreamError::syntax(line_num, "ID column must be a string")),
907        };
908
909        self.update_list_context(&type_name, &id);
910        self.state.prev_row = Some(values.clone());
911
912        // Calculate depth as number of list contexts minus 1 (0-indexed nesting level)
913        let depth = self
914            .state
915            .stack
916            .iter()
917            .filter(|ctx| matches!(ctx, Context::List { .. }))
918            .count()
919            .saturating_sub(1);
920
921        let mut node = NodeInfo::new(type_name.clone(), id.to_string(), values, depth, line_num);
922
923        if let Some((parent_type, parent_id)) = parent_info {
924            node = node.with_parent(parent_type, parent_id);
925        }
926
927        Ok(Some(NodeEvent::Node(node)))
928    }
929
930    fn find_list_context(
931        &mut self,
932        indent: usize,
933        line_num: usize,
934    ) -> StreamResult<ListContextResult> {
935        let header = self
936            .header
937            .as_ref()
938            .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
939
940        for ctx in self.state.stack.iter().rev() {
941            if let Context::List {
942                type_name,
943                schema,
944                row_indent,
945                last_node,
946                ..
947            } = ctx
948            {
949                if indent == *row_indent {
950                    return Ok((type_name.clone(), schema.clone(), None));
951                } else if indent == *row_indent + 1 {
952                    let parent_info = last_node.clone().ok_or_else(|| {
953                        StreamError::orphan_row(line_num, "child row has no parent")
954                    })?;
955
956                    let child_type = header.nests.get(type_name).ok_or_else(|| {
957                        StreamError::orphan_row(
958                            line_num,
959                            format!("no NEST rule for parent type '{type_name}'"),
960                        )
961                    })?;
962
963                    let child_schema = header.structs.get(child_type).ok_or_else(|| {
964                        StreamError::schema(
965                            line_num,
966                            format!("child type '{child_type}' not defined"),
967                        )
968                    })?;
969
970                    self.state.stack.push(Context::List {
971                        key: child_type.clone(),
972                        type_name: child_type.clone(),
973                        schema: child_schema.clone(),
974                        row_indent: indent,
975                        count: 0,
976                        last_node: None,
977                    });
978
979                    return Ok((child_type.clone(), child_schema.clone(), Some(parent_info)));
980                }
981            }
982        }
983
984        Err(StreamError::syntax(
985            line_num,
986            "matrix row outside of list context",
987        ))
988    }
989
990    fn update_list_context(&mut self, type_name: &str, id: &str) {
991        for ctx in self.state.stack.iter_mut().rev() {
992            if let Context::List {
993                type_name: ctx_type,
994                last_node,
995                count,
996                ..
997            } = ctx
998            {
999                if ctx_type == type_name {
1000                    *last_node = Some((type_name.to_string(), id.to_string()));
1001                    *count += 1;
1002                    break;
1003                }
1004            }
1005        }
1006    }
1007
1008    #[inline]
1009    fn infer_value(&self, s: &str, _line_num: usize) -> StreamResult<Value> {
1010        let s = s.trim();
1011
1012        if s.is_empty() || s == "~" {
1013            return Ok(Value::Null);
1014        }
1015
1016        if s == "true" {
1017            return Ok(Value::Bool(true));
1018        }
1019        if s == "false" {
1020            return Ok(Value::Bool(false));
1021        }
1022
1023        if let Some(ref_part) = s.strip_prefix('@') {
1024            if let Some(colon_pos) = ref_part.find(':') {
1025                let type_name = &ref_part[..colon_pos];
1026                let id = &ref_part[colon_pos + 1..];
1027                return Ok(Value::Reference(hedl_core::Reference {
1028                    type_name: Some(type_name.to_string().into()),
1029                    id: id.to_string().into(),
1030                }));
1031            }
1032            return Ok(Value::Reference(hedl_core::Reference {
1033                type_name: None,
1034                id: ref_part.to_string().into(),
1035            }));
1036        }
1037
1038        if let Some(alias) = s.strip_prefix('$') {
1039            if let Some(header) = &self.header {
1040                if let Some(value) = header.aliases.get(alias) {
1041                    return Ok(Value::String(value.clone().into()));
1042                }
1043            }
1044            return Ok(Value::String(s.to_string().into()));
1045        }
1046
1047        if let Ok(i) = s.parse::<i64>() {
1048            return Ok(Value::Int(i));
1049        }
1050        if let Ok(f) = s.parse::<f64>() {
1051            return Ok(Value::Float(f));
1052        }
1053
1054        Ok(Value::String(s.to_string().into()))
1055    }
1056
1057    fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
1058        // If we already sent EndOfDocument, return None to signal true end of stream
1059        if self.sent_end_of_document {
1060            return Ok(None);
1061        }
1062
1063        while self.state.stack.len() > 1 {
1064            let ctx = self.state.stack.pop().expect("stack has elements");
1065            match ctx {
1066                Context::List {
1067                    key,
1068                    type_name,
1069                    count,
1070                    ..
1071                } => {
1072                    return Ok(Some(NodeEvent::ListEnd {
1073                        key,
1074                        type_name,
1075                        count,
1076                    }));
1077                }
1078                Context::Object { key, .. } => {
1079                    return Ok(Some(NodeEvent::ObjectEnd { key }));
1080                }
1081                Context::Root => {
1082                    // Root context handled by the while condition
1083                }
1084            }
1085        }
1086
1087        // Mark that we've sent EndOfDocument, so subsequent calls return None
1088        self.sent_end_of_document = true;
1089        Ok(Some(NodeEvent::EndOfDocument))
1090    }
1091
1092    /// Read up to `n` events in a single async operation.
1093    ///
1094    /// Reduces await overhead for high-throughput scenarios by batching event reads.
1095    /// This can improve performance when processing many small events.
1096    ///
1097    /// # Parameters
1098    ///
1099    /// - `n`: Maximum number of events to read
1100    ///
1101    /// # Returns
1102    ///
1103    /// - `Ok(Vec<NodeEvent>)`: Vector of events (may be fewer than `n` if EOF reached)
1104    /// - `Err(e)`: Parsing error encountered
1105    ///
1106    /// # Examples
1107    ///
1108    /// ```rust
1109    /// # #[cfg(feature = "async")]
1110    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1111    /// use hedl_stream::AsyncStreamingParser;
1112    /// use tokio::fs::File;
1113    ///
1114    /// let file = File::open("data.hedl").await?;
1115    /// let mut parser = AsyncStreamingParser::new(file).await?;
1116    ///
1117    /// // Read events in batches of 100
1118    /// loop {
1119    ///     let batch = parser.next_batch(100).await?;
1120    ///     if batch.is_empty() {
1121    ///         break;
1122    ///     }
1123    ///
1124    ///     // Process batch
1125    ///     for event in batch {
1126    ///         // ...
1127    ///     }
1128    /// }
1129    /// # Ok(())
1130    /// # }
1131    /// ```
1132    pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>> {
1133        let mut batch = Vec::with_capacity(n.min(100)); // Cap initial allocation
1134        for _ in 0..n {
1135            match self.next_event().await? {
1136                Some(NodeEvent::EndOfDocument) => break,
1137                Some(event) => batch.push(event),
1138                None => break,
1139            }
1140        }
1141        Ok(batch)
1142    }
1143
1144    /// Read events with cancellation support via tokio watch channel.
1145    ///
1146    /// Returns `Ok(None)` if cancelled, otherwise behaves like `next_event()`.
1147    ///
1148    /// # Examples
1149    ///
1150    /// ```rust
1151    /// # #[cfg(feature = "async")]
1152    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1153    /// use hedl_stream::AsyncStreamingParser;
1154    /// use tokio::sync::watch;
1155    /// use std::io::Cursor;
1156    ///
1157    /// let input = r#"
1158    /// %VERSION: 1.0
1159    /// %STRUCT: User: [id, name]
1160    /// ---
1161    /// users: @User
1162    ///   | alice, Alice
1163    /// "#;
1164    ///
1165    /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
1166    /// let (cancel_tx, mut cancel_rx) = watch::channel(false);
1167    ///
1168    /// // Can cancel from another task
1169    /// tokio::spawn(async move {
1170    ///     tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
1171    ///     let _ = cancel_tx.send(true);
1172    /// });
1173    ///
1174    /// while let Some(event) = parser.next_event_cancellable(&mut cancel_rx).await? {
1175    ///     // Process event
1176    ///     # break;
1177    /// }
1178    /// # Ok(())
1179    /// # }
1180    /// ```
1181    #[cfg(feature = "async")]
1182    pub async fn next_event_cancellable(
1183        &mut self,
1184        cancel_rx: &mut tokio::sync::watch::Receiver<bool>,
1185    ) -> StreamResult<Option<NodeEvent>> {
1186        // Check if cancelled
1187        if *cancel_rx.borrow() {
1188            return Ok(None);
1189        }
1190
1191        tokio::select! {
1192            result = self.next_event() => result,
1193            _ = cancel_rx.changed() => {
1194                if *cancel_rx.borrow() {
1195                    Ok(None)
1196                } else {
1197                    // False alarm, continue
1198                    self.next_event().await
1199                }
1200            }
1201        }
1202    }
1203}
1204
1205// Stream trait implementation for futures ecosystem integration
1206#[cfg(feature = "async")]
1207impl<R: AsyncRead + Unpin> futures_core::Stream for AsyncStreamingParser<R> {
1208    type Item = StreamResult<NodeEvent>;
1209
1210    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
1211        // Create a future from next_event and poll it
1212        let fut = self.next_event();
1213        tokio::pin!(fut);
1214
1215        match fut.poll(cx) {
1216            Poll::Ready(Ok(Some(NodeEvent::EndOfDocument))) => Poll::Ready(None),
1217            Poll::Ready(Ok(Some(event))) => Poll::Ready(Some(Ok(event))),
1218            Poll::Ready(Ok(None)) => Poll::Ready(None),
1219            // Note: errored flag is set inside next_event before returning errors
1220            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
1221            Poll::Pending => Poll::Pending,
1222        }
1223    }
1224}
1225
1226#[cfg(all(test, feature = "async"))]
1227mod tests {
1228    use super::*;
1229    use std::io::Cursor;
1230    use std::time::Duration;
1231
1232    #[tokio::test]
1233    async fn test_parse_header() {
1234        let input = r#"
1235%VERSION: 1.0
1236%STRUCT: User: [id, name, email]
1237%ALIAS active = "Active"
1238%NEST: User > Order
1239---
1240"#;
1241        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1242        let header = parser.header().unwrap();
1243
1244        assert_eq!(header.version, (1, 0));
1245        assert!(header.structs.contains_key("User"));
1246        assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
1247        assert_eq!(header.nests.get("User"), Some(&"Order".to_string()));
1248    }
1249
1250    #[tokio::test]
1251    async fn test_streaming_nodes() {
1252        let input = r"
1253%VERSION: 1.0
1254%STRUCT: User: [id, name]
1255---
1256users: @User
1257  | alice, Alice Smith
1258  | bob, Bob Jones
1259";
1260        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1261
1262        let mut events = Vec::new();
1263        while let Some(event) = parser.next_event().await.unwrap() {
1264            events.push(event);
1265        }
1266
1267        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1268        assert_eq!(nodes.len(), 2);
1269        assert_eq!(nodes[0].id, "alice");
1270        assert_eq!(nodes[1].id, "bob");
1271    }
1272
1273    #[tokio::test]
1274    async fn test_timeout() {
1275        // This test would need a custom reader that delays, simplified for now
1276        let config = StreamingParserConfig {
1277            timeout: Some(Duration::from_millis(1)),
1278            ..Default::default()
1279        };
1280
1281        let input = r"
1282%VERSION: 1.0
1283---
1284";
1285        let parser = AsyncStreamingParser::with_config(Cursor::new(input), config).await;
1286        assert!(parser.is_ok()); // Header should parse within timeout
1287    }
1288
1289    #[tokio::test]
1290    async fn test_inline_schema() {
1291        let input = r"
1292%VERSION: 1.0
1293---
1294items: @Item[id, name]
1295  | item1, First
1296  | item2, Second
1297";
1298        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1299
1300        let mut nodes = Vec::new();
1301        while let Some(event) = parser.next_event().await.unwrap() {
1302            if let NodeEvent::Node(node) = event {
1303                nodes.push(node);
1304            }
1305        }
1306
1307        assert_eq!(nodes.len(), 2);
1308        assert_eq!(nodes[0].type_name, "Item");
1309    }
1310
1311    #[tokio::test]
1312    async fn test_error_handling() {
1313        let input = r"
1314%VERSION: 1.0
1315---
1316invalid line without colon
1317";
1318        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1319
1320        let result = parser.next_event().await;
1321        assert!(result.is_err());
1322        assert!(matches!(result.unwrap_err(), StreamError::Syntax { .. }));
1323    }
1324
1325    #[tokio::test]
1326    async fn test_unicode() {
1327        let input = r"
1328%VERSION: 1.0
1329%STRUCT: User: [id, name]
1330---
1331users: @User
1332  | 用户1, 张三
1333  | пользователь, Иван
1334";
1335        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1336
1337        let mut nodes = Vec::new();
1338        while let Some(event) = parser.next_event().await.unwrap() {
1339            if let NodeEvent::Node(node) = event {
1340                nodes.push(node);
1341            }
1342        }
1343
1344        assert_eq!(nodes.len(), 2);
1345        assert_eq!(nodes[0].id, "用户1");
1346        assert_eq!(nodes[1].id, "пользователь");
1347    }
1348
1349    // ============ STREAM TRAIT TESTS ============
1350
1351    #[tokio::test]
1352    async fn test_stream_trait_basic() {
1353        use futures::StreamExt;
1354
1355        let input = r"
1356%VERSION: 1.0
1357%STRUCT: User: [id, name]
1358---
1359users: @User
1360  | alice, Alice
1361  | bob, Bob
1362";
1363        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1364
1365        let events: Vec<_> = parser.collect().await;
1366        assert!(events.iter().all(std::result::Result::is_ok));
1367
1368        let nodes: Vec<_> = events
1369            .iter()
1370            .filter_map(|e| e.as_ref().ok())
1371            .filter_map(|e| e.as_node())
1372            .collect();
1373
1374        assert_eq!(nodes.len(), 2);
1375        assert_eq!(nodes[0].id, "alice");
1376        assert_eq!(nodes[1].id, "bob");
1377    }
1378
1379    #[tokio::test]
1380    async fn test_stream_trait_filter_map() {
1381        use futures::StreamExt;
1382
1383        let input = r"
1384%VERSION: 1.0
1385%STRUCT: User: [id, name, active]
1386---
1387users: @User
1388  | alice, Alice, true
1389  | bob, Bob, false
1390  | carol, Carol, true
1391";
1392        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1393
1394        // Only collect active users using stream combinators
1395        let active_nodes: Vec<_> = parser
1396            .filter_map(|result| async move {
1397                result.ok().and_then(|event| {
1398                    if let NodeEvent::Node(node) = event {
1399                        Some(node)
1400                    } else {
1401                        None
1402                    }
1403                })
1404            })
1405            .filter(|node| {
1406                let is_active = matches!(node.get_field(2), Some(Value::Bool(true)));
1407                async move { is_active }
1408            })
1409            .collect()
1410            .await;
1411
1412        assert_eq!(active_nodes.len(), 2);
1413        assert_eq!(active_nodes[0].id, "alice");
1414        assert_eq!(active_nodes[1].id, "carol");
1415    }
1416
1417    #[tokio::test]
1418    async fn test_stream_trait_take() {
1419        use futures::StreamExt;
1420
1421        let input = r"
1422%VERSION: 1.0
1423%STRUCT: User: [id, name]
1424---
1425users: @User
1426  | alice, Alice
1427  | bob, Bob
1428  | carol, Carol
1429  | dave, Dave
1430";
1431        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1432
1433        // Take only first 2 node events
1434        let nodes: Vec<_> = parser
1435            .filter_map(|result| async move {
1436                result.ok().and_then(|event| {
1437                    if let NodeEvent::Node(node) = event {
1438                        Some(node)
1439                    } else {
1440                        None
1441                    }
1442                })
1443            })
1444            .take(2)
1445            .collect()
1446            .await;
1447
1448        assert_eq!(nodes.len(), 2);
1449        assert_eq!(nodes[0].id, "alice");
1450        assert_eq!(nodes[1].id, "bob");
1451    }
1452
1453    #[tokio::test]
1454    async fn test_stream_trait_count() {
1455        use futures::StreamExt;
1456
1457        let input = r"
1458%VERSION: 1.0
1459%STRUCT: User: [id, name]
1460---
1461users: @User
1462  | alice, Alice
1463  | bob, Bob
1464  | carol, Carol
1465";
1466        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1467
1468        let total = parser.count().await;
1469        // Should count all events: ListStart, 3 Nodes, ListEnd = 5 events
1470        assert_eq!(total, 5);
1471    }
1472
1473    // ============ BATCH READING TESTS ============
1474
1475    #[tokio::test]
1476    async fn test_next_batch_basic() {
1477        let input = r"
1478%VERSION: 1.0
1479%STRUCT: User: [id, name]
1480---
1481users: @User
1482  | alice, Alice
1483  | bob, Bob
1484  | carol, Carol
1485";
1486        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1487
1488        // Read all events in one batch
1489        let batch = parser.next_batch(10).await.unwrap();
1490        assert_eq!(batch.len(), 5); // ListStart, 3 Nodes, ListEnd
1491
1492        // Next batch should be empty (EOF)
1493        let batch = parser.next_batch(10).await.unwrap();
1494        assert!(batch.is_empty());
1495    }
1496
1497    #[tokio::test]
1498    async fn test_next_batch_incremental() {
1499        let input = r"
1500%VERSION: 1.0
1501%STRUCT: User: [id, name]
1502---
1503users: @User
1504  | alice, Alice
1505  | bob, Bob
1506  | carol, Carol
1507";
1508        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1509
1510        // Read in small batches
1511        let batch1 = parser.next_batch(2).await.unwrap();
1512        assert_eq!(batch1.len(), 2); // ListStart, Node
1513
1514        let batch2 = parser.next_batch(2).await.unwrap();
1515        assert_eq!(batch2.len(), 2); // Node, Node
1516
1517        let batch3 = parser.next_batch(2).await.unwrap();
1518        assert_eq!(batch3.len(), 1); // ListEnd
1519
1520        let batch4 = parser.next_batch(2).await.unwrap();
1521        assert!(batch4.is_empty());
1522    }
1523
1524    #[tokio::test]
1525    async fn test_next_batch_empty_file() {
1526        let input = r"
1527%VERSION: 1.0
1528%STRUCT: User: [id, name]
1529---
1530";
1531        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1532
1533        let batch = parser.next_batch(10).await.unwrap();
1534        assert!(batch.is_empty());
1535    }
1536
1537    #[tokio::test]
1538    async fn test_next_batch_large() {
1539        let mut input = String::from(
1540            r"
1541%VERSION: 1.0
1542%STRUCT: Data: [id, value]
1543---
1544data: @Data
1545",
1546        );
1547        for i in 0..500 {
1548            input.push_str(&format!("  | row{i}, value{i}\n"));
1549        }
1550
1551        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1552
1553        // Read in large batches
1554        let batch1 = parser.next_batch(100).await.unwrap();
1555        assert_eq!(batch1.len(), 100); // ListStart + 99 Nodes
1556
1557        let batch2 = parser.next_batch(100).await.unwrap();
1558        assert_eq!(batch2.len(), 100); // 100 Nodes
1559
1560        // Continue until we get all events
1561        let mut total = batch1.len() + batch2.len();
1562        loop {
1563            let batch = parser.next_batch(100).await.unwrap();
1564            if batch.is_empty() {
1565                break;
1566            }
1567            total += batch.len();
1568        }
1569
1570        // Total: ListStart + 500 Nodes + ListEnd = 502
1571        assert_eq!(total, 502);
1572    }
1573
1574    // ============ CANCELLATION TESTS ============
1575
1576    #[tokio::test]
1577    async fn test_cancellation_basic() {
1578        use tokio::sync::watch;
1579
1580        let input = r"
1581%VERSION: 1.0
1582%STRUCT: User: [id, name]
1583---
1584users: @User
1585  | alice, Alice
1586  | bob, Bob
1587";
1588        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1589
1590        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1591
1592        // Read first event normally
1593        let event1 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1594        assert!(event1.is_some());
1595
1596        // Cancel
1597        cancel_tx.send(true).unwrap();
1598
1599        // Next read should return None (cancelled)
1600        let event2 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1601        assert!(event2.is_none());
1602    }
1603
1604    #[tokio::test]
1605    async fn test_cancellation_not_cancelled() {
1606        use tokio::sync::watch;
1607
1608        let input = r"
1609%VERSION: 1.0
1610%STRUCT: User: [id, name]
1611---
1612users: @User
1613  | alice, Alice
1614";
1615        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1616
1617        let (_cancel_tx, mut cancel_rx) = watch::channel(false);
1618
1619        // Read all events without cancellation
1620        let mut count = 0;
1621        while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1622            count += 1;
1623        }
1624
1625        // Should have read all events: ListStart, Node, ListEnd, EndOfDocument
1626        assert_eq!(count, 4);
1627    }
1628
1629    #[tokio::test]
1630    async fn test_cancellation_during_processing() {
1631        use tokio::sync::watch;
1632
1633        let mut input = String::from(
1634            r"
1635%VERSION: 1.0
1636%STRUCT: Data: [id]
1637---
1638data: @Data
1639",
1640        );
1641        for i in 0..1000 {
1642            input.push_str(&format!("  | row{i}\n"));
1643        }
1644
1645        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1646
1647        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1648
1649        // Spawn a task that cancels after reading some events
1650        tokio::spawn(async move {
1651            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1652            cancel_tx.send(true).unwrap();
1653        });
1654
1655        let mut count = 0;
1656        while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1657            count += 1;
1658            // Small delay to allow cancellation to trigger
1659            tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
1660        }
1661
1662        // Should have read some events but not all 1002 (ListStart + 1000 Nodes + ListEnd)
1663        assert!(count < 1002);
1664        assert!(count > 0);
1665    }
1666
1667    // ============ CONCURRENT PROCESSING TESTS ============
1668
1669    #[tokio::test]
1670    async fn test_concurrent_file_processing() {
1671        let input = r"
1672%VERSION: 1.0
1673%STRUCT: User: [id, name]
1674---
1675users: @User
1676  | alice, Alice
1677  | bob, Bob
1678";
1679
1680        // Process multiple identical streams concurrently
1681        let tasks: Vec<_> = (0..5)
1682            .map(|_| {
1683                let input_clone = input.to_string();
1684                tokio::spawn(async move {
1685                    let mut parser = AsyncStreamingParser::new(Cursor::new(input_clone))
1686                        .await
1687                        .unwrap();
1688
1689                    let mut count = 0;
1690                    while let Some(_event) = parser.next_event().await.unwrap() {
1691                        count += 1;
1692                    }
1693                    count
1694                })
1695            })
1696            .collect();
1697
1698        let results = futures::future::join_all(tasks).await;
1699
1700        // All tasks should succeed and count the same number of events
1701        for result in results {
1702            assert_eq!(result.unwrap(), 5); // ListStart, 2 Nodes, ListEnd, EndOfDocument
1703        }
1704    }
1705
1706    #[tokio::test]
1707    async fn test_concurrent_with_stream_trait() {
1708        use futures::StreamExt;
1709
1710        let input = r"
1711%VERSION: 1.0
1712%STRUCT: Data: [id]
1713---
1714data: @Data
1715  | row1
1716  | row2
1717  | row3
1718";
1719
1720        // Process multiple streams concurrently using manual await
1721        // Note: futures::Stream combinators create !Send futures, so we can't use tokio::spawn
1722        let mut counts = Vec::new();
1723
1724        for _ in 0..10 {
1725            let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1726
1727            // Count nodes using stream combinators
1728            let count = parser
1729                .filter_map(|result| async move {
1730                    result.ok().and_then(|event| {
1731                        if let NodeEvent::Node(_) = event {
1732                            Some(())
1733                        } else {
1734                            None
1735                        }
1736                    })
1737                })
1738                .count()
1739                .await;
1740
1741            counts.push(count);
1742        }
1743
1744        // All should count 3 nodes
1745        for count in counts {
1746            assert_eq!(count, 3);
1747        }
1748    }
1749
1750    // ============ EDGE CASE AND INTEGRATION TESTS ============
1751
1752    #[tokio::test]
1753    async fn test_stream_trait_with_errors() {
1754        use futures::StreamExt;
1755
1756        let input = r"
1757%VERSION: 1.0
1758%STRUCT: User: [id, name]
1759---
1760users: @User
1761  | alice, Alice
1762  | bob
1763  | carol, Carol
1764";
1765        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1766
1767        let results: Vec<_> = parser.collect().await;
1768
1769        // Should have error for malformed row (bob with only 1 field)
1770        let errors: Vec<_> = results.iter().filter(|r| r.is_err()).collect();
1771        assert!(!errors.is_empty());
1772    }
1773
1774    #[tokio::test]
1775    async fn test_batch_with_mixed_events() {
1776        let input = r"
1777%VERSION: 1.0
1778%STRUCT: User: [id, name]
1779%STRUCT: Product: [id, title]
1780---
1781users: @User
1782  | alice, Alice
1783products: @Product
1784  | prod1, Widget
1785";
1786        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1787
1788        let batch = parser.next_batch(10).await.unwrap();
1789
1790        // Should contain: ListStart(User), Node(alice), ListEnd(User), ListStart(Product), Node(prod1), ListEnd(Product)
1791        assert_eq!(batch.len(), 6);
1792
1793        let list_starts: Vec<_> = batch
1794            .iter()
1795            .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
1796            .collect();
1797        assert_eq!(list_starts.len(), 2);
1798    }
1799
1800    #[tokio::test]
1801    async fn test_stream_empty_after_cancellation() {
1802        use tokio::sync::watch;
1803
1804        let input = r"
1805%VERSION: 1.0
1806%STRUCT: User: [id, name]
1807---
1808users: @User
1809  | alice, Alice
1810  | bob, Bob
1811";
1812        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1813
1814        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1815
1816        // Read one event
1817        let _event = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1818
1819        // Cancel
1820        cancel_tx.send(true).unwrap();
1821
1822        // Subsequent reads should return None
1823        assert!(parser
1824            .next_event_cancellable(&mut cancel_rx)
1825            .await
1826            .unwrap()
1827            .is_none());
1828        assert!(parser
1829            .next_event_cancellable(&mut cancel_rx)
1830            .await
1831            .unwrap()
1832            .is_none());
1833    }
1834
1835    #[tokio::test]
1836    async fn test_batch_reading_performance() {
1837        // Create a large dataset
1838        let mut input = String::from(
1839            r"
1840%VERSION: 1.0
1841%STRUCT: Data: [id, value]
1842---
1843data: @Data
1844",
1845        );
1846        for i in 0..1000 {
1847            input.push_str(&format!("  | row{i}, value{i}\n"));
1848        }
1849
1850        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1851
1852        let start = std::time::Instant::now();
1853
1854        // Read in batches
1855        let mut total = 0;
1856        loop {
1857            let batch = parser.next_batch(100).await.unwrap();
1858            if batch.is_empty() {
1859                break;
1860            }
1861            total += batch.len();
1862        }
1863
1864        let elapsed = start.elapsed();
1865
1866        // Should have read all events
1867        assert_eq!(total, 1002); // ListStart + 1000 Nodes + ListEnd
1868
1869        // Should complete reasonably quickly (< 100ms for 1000 rows)
1870        assert!(elapsed.as_millis() < 100);
1871    }
1872}