Skip to main content

hedl_stream/async_parser/
mod.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async streaming parser implementation.
19//!
20//! This module provides an asynchronous streaming parser for HEDL documents that mirrors
21//! the synchronous [`StreamingParser`](crate::StreamingParser) but uses tokio's async I/O.
22//!
23//! # When to Use Async
24//!
25//! **Choose Async (`AsyncStreamingParser`) when:**
26//! - Parsing network streams or remote data sources
27//! - High-concurrency scenarios (thousands of concurrent parsers)
28//! - Integration with async web frameworks (axum, actix-web, etc.)
29//! - Need to parse multiple streams concurrently
30//! - Working in an async runtime context
31//!
32//! **Choose Sync (`StreamingParser`) when:**
33//! - Parsing local files
34//! - Single-threaded batch processing
35//! - Simpler synchronous code is preferred
36//! - Performance is critical and no I/O waiting occurs
37//!
38//! # Performance Characteristics
39//!
40//! - **Non-blocking I/O**: Yields to runtime when waiting for data
41//! - **Same Memory Profile**: Identical to sync parser (~constant memory)
42//! - **Concurrent Processing**: Can process many streams simultaneously
43//! - **Zero-Copy**: Minimal allocations, same as sync version
44//!
45//! # Examples
46//!
47//! ## Basic Async Streaming
48//!
49//! ```rust,no_run
50//! # #[cfg(feature = "async")]
51//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
53//! use tokio::fs::File;
54//!
55//! let file = File::open("large-dataset.hedl").await?;
56//! let mut parser = AsyncStreamingParser::new(file).await?;
57//!
58//! while let Some(event) = parser.next_event().await? {
59//!     match event {
60//!         NodeEvent::Node(node) => {
61//!             println!("{}:{}", node.type_name, node.id);
62//!         }
63//!         NodeEvent::ListStart { type_name, .. } => {
64//!             println!("List started: {}", type_name);
65//!         }
66//!         _ => {}
67//!     }
68//! }
69//! # Ok(())
70//! # }
71//! ```
72//!
73//! ## Concurrent Processing
74//!
75//! ```rust,no_run
76//! # #[cfg(feature = "async")]
77//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
79//! use tokio::fs::File;
80//!
81//! async fn process_file(path: &str) -> Result<usize, Box<dyn std::error::Error>> {
82//!     let file = File::open(path).await?;
83//!     let mut parser = AsyncStreamingParser::new(file).await?;
84//!
85//!     let mut count = 0;
86//!     while let Some(event) = parser.next_event().await? {
87//!         if let NodeEvent::Node(_) = event {
88//!             count += 1;
89//!         }
90//!     }
91//!     Ok(count)
92//! }
93//!
94//! // Process multiple files concurrently
95//! let results = tokio::join!(
96//!     process_file("file1.hedl"),
97//!     process_file("file2.hedl"),
98//!     process_file("file3.hedl"),
99//! );
100//! # Ok(())
101//! # }
102//! ```
103
104mod header_parsing;
105mod line_parsing;
106
107use crate::async_reader::AsyncLineReader;
108use crate::error::{StreamError, StreamResult};
109use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
110use crate::parser::StreamingParserConfig;
111use hedl_core::Value;
112use std::future::Future;
113use std::pin::Pin;
114use std::task::{Context as TaskContext, Poll};
115use std::time::Instant;
116use tokio::io::AsyncRead;
117
118/// Type alias for list context lookup result: (`type_name`, schema, optional `last_node` info)
119type ListContextResult = (String, Vec<String>, Option<(String, String)>);
120
121/// Async streaming HEDL parser.
122///
123/// Processes HEDL documents asynchronously, yielding `NodeEvent` items as they
124/// are parsed without loading the entire document into memory. Uses tokio's
125/// async I/O for non-blocking operation.
126///
127/// # Memory Characteristics
128///
129/// - **Header**: Parsed once at initialization and kept in memory
130/// - **Per-Line**: Only current line and parsing context (stack depth proportional to nesting)
131/// - **No Buffering**: Nodes are yielded immediately after parsing
132/// - **Identical to Sync**: Same memory profile as synchronous parser
133///
134/// # Examples
135///
136/// ## Parse from Async File
137///
138/// ```rust,no_run
139/// # #[cfg(feature = "async")]
140/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141/// use hedl_stream::{AsyncStreamingParser, NodeEvent};
142/// use tokio::fs::File;
143///
144/// let file = File::open("data.hedl").await?;
145/// let mut parser = AsyncStreamingParser::new(file).await?;
146///
147/// while let Some(event) = parser.next_event().await? {
148///     if let NodeEvent::Node(node) = event {
149///         println!("Processing {}: {}", node.type_name, node.id);
150///     }
151/// }
152/// # Ok(())
153/// # }
154/// ```
155///
156/// ## With Timeout Protection
157///
158/// ```rust,no_run
159/// # #[cfg(feature = "async")]
160/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
161/// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig, StreamError};
162/// use std::time::Duration;
163/// use std::io::Cursor;
164///
165/// let config = StreamingParserConfig {
166///     timeout: Some(Duration::from_secs(10)),
167///     ..Default::default()
168/// };
169///
170/// let mut parser = AsyncStreamingParser::with_config(
171///     Cursor::new("untrusted input"),
172///     config
173/// ).await?;
174///
175/// while let Some(event) = parser.next_event().await? {
176///     // Process event
177/// }
178/// # Ok(())
179/// # }
180/// ```
181pub struct AsyncStreamingParser<R: AsyncRead + Unpin> {
182    reader: AsyncLineReader<R>,
183    config: StreamingParserConfig,
184    header: Option<HeaderInfo>,
185    state: ParserState,
186    finished: bool,
187    errored: bool,              // Track if an error occurred to skip finalize
188    sent_end_of_document: bool, // Track if EndOfDocument has been returned
189    start_time: Instant,
190    operations_count: usize,
191}
192
193#[derive(Debug)]
194struct ParserState {
195    /// Stack of active contexts.
196    stack: Vec<Context>,
197    /// Previous row values for ditto handling (deprecated in v2.0+).
198    prev_row: Option<Vec<Value>>,
199    /// Pending events from inline children parsing.
200    pending_events: Vec<NodeEvent>,
201}
202
203#[derive(Debug, Clone)]
204enum Context {
205    Root,
206    Object {
207        key: String,
208        indent: usize,
209    },
210    List {
211        key: String,
212        type_name: String,
213        schema: Vec<String>,
214        row_indent: usize,
215        count: usize,
216        last_node: Option<(String, String)>, // (type, id)
217    },
218}
219
220impl<R: AsyncRead + Unpin> AsyncStreamingParser<R> {
221    /// Create a new async streaming parser with default configuration.
222    ///
223    /// The parser immediately reads and validates the HEDL header (version and
224    /// schema directives). If the header is invalid, this function returns an error.
225    ///
226    /// # Parameters
227    ///
228    /// - `reader`: Any type implementing `AsyncRead + Unpin`
229    ///
230    /// # Returns
231    ///
232    /// - `Ok(parser)`: Parser ready to yield events
233    /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
234    ///
235    /// # Examples
236    ///
237    /// ## From a File
238    ///
239    /// ```rust,no_run
240    /// # #[cfg(feature = "async")]
241    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
242    /// use hedl_stream::AsyncStreamingParser;
243    /// use tokio::fs::File;
244    ///
245    /// let file = File::open("data.hedl").await?;
246    /// let parser = AsyncStreamingParser::new(file).await?;
247    /// # Ok(())
248    /// # }
249    /// ```
250    ///
251    /// ## From a String
252    ///
253    /// ```rust
254    /// # #[cfg(feature = "async")]
255    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256    /// use hedl_stream::AsyncStreamingParser;
257    /// use std::io::Cursor;
258    ///
259    /// let data = r#"
260    /// %VERSION: 1.0
261    /// %STRUCT: User: [id, name]
262    /// ---
263    /// users:@User
264    ///  | alice, Alice
265    /// "#;
266    ///
267    /// let parser = AsyncStreamingParser::new(Cursor::new(data)).await?;
268    /// # Ok(())
269    /// # }
270    /// ```
271    ///
272    /// # Errors
273    ///
274    /// - `StreamError::MissingVersion`: No `%VERSION` directive found
275    /// - `StreamError::InvalidVersion`: Invalid version format
276    /// - `StreamError::Syntax`: Malformed header directive
277    /// - `StreamError::Io`: I/O error reading input
278    pub async fn new(reader: R) -> StreamResult<Self> {
279        Self::with_config(reader, StreamingParserConfig::default()).await
280    }
281
282    /// Create an async streaming parser with custom configuration.
283    ///
284    /// Use this when you need to control memory limits, buffer sizes, or enable
285    /// timeout protection for untrusted input.
286    ///
287    /// # Examples
288    ///
289    /// ## With Timeout
290    ///
291    /// ```rust
292    /// # #[cfg(feature = "async")]
293    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
294    /// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig};
295    /// use std::time::Duration;
296    /// use std::io::Cursor;
297    ///
298    /// let config = StreamingParserConfig {
299    ///     timeout: Some(Duration::from_secs(30)),
300    ///     ..Default::default()
301    /// };
302    ///
303    /// let parser = AsyncStreamingParser::with_config(
304    ///     Cursor::new("untrusted input"),
305    ///     config
306    /// ).await?;
307    /// # Ok(())
308    /// # }
309    /// ```
310    pub async fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
311        let mut parser = Self {
312            reader: AsyncLineReader::with_capacity(reader, config.buffer_size),
313            config,
314            header: None,
315            state: ParserState {
316                stack: vec![Context::Root],
317                prev_row: None,
318                pending_events: Vec::new(),
319            },
320            finished: false,
321            errored: false,
322            sent_end_of_document: false,
323            start_time: Instant::now(),
324            operations_count: 0,
325        };
326
327        // Parse header immediately
328        parser.parse_header().await?;
329
330        Ok(parser)
331    }
332
333    /// Check if timeout has been exceeded.
334    #[inline]
335    fn check_timeout(&self) -> StreamResult<()> {
336        if let Some(timeout) = self.config.timeout {
337            let elapsed = self.start_time.elapsed();
338            if elapsed > timeout {
339                return Err(StreamError::Timeout {
340                    elapsed,
341                    limit: timeout,
342                });
343            }
344        }
345        Ok(())
346    }
347
348    /// Set the errored flag and return an error.
349    ///
350    /// This helper ensures that after any error is returned, subsequent calls
351    /// to `next_event` will return `Ok(None)` without attempting further parsing.
352    #[inline]
353    fn return_error<T>(&mut self, e: StreamError) -> StreamResult<T> {
354        self.finished = true;
355        self.errored = true;
356        Err(e)
357    }
358
359    /// Get the parsed header information.
360    ///
361    /// Returns header metadata including version, schema definitions, aliases,
362    /// and nesting rules. This is available immediately after parser creation.
363    ///
364    /// # Examples
365    ///
366    /// ```rust
367    /// # #[cfg(feature = "async")]
368    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369    /// use hedl_stream::AsyncStreamingParser;
370    /// use std::io::Cursor;
371    ///
372    /// let input = r#"
373    /// %VERSION: 1.0
374    /// %STRUCT: User: [id, name, email]
375    /// ---
376    /// "#;
377    ///
378    /// let parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
379    /// let header = parser.header().unwrap();
380    ///
381    /// assert_eq!(header.version, (1, 0));
382    /// let user_schema = header.get_schema("User").unwrap();
383    /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
384    /// # Ok(())
385    /// # }
386    /// ```
387    pub fn header(&self) -> Option<&HeaderInfo> {
388        self.header.as_ref()
389    }
390
391    /// Parse the next event from the stream asynchronously.
392    ///
393    /// Returns `Ok(Some(event))` if an event was parsed, `Ok(None)` at end of document,
394    /// or `Err` on parsing errors.
395    ///
396    /// # Performance
397    ///
398    /// This method is async and will yield to the tokio runtime when waiting for I/O,
399    /// allowing other tasks to run. It does not block the thread.
400    ///
401    /// # Examples
402    ///
403    /// ```rust
404    /// # #[cfg(feature = "async")]
405    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
406    /// use hedl_stream::{AsyncStreamingParser, NodeEvent};
407    /// use std::io::Cursor;
408    ///
409    /// let input = r#"
410    /// %VERSION: 1.0
411    /// %STRUCT: User: [id, name]
412    /// ---
413    /// users:@User
414    ///  | alice, Alice
415    /// "#;
416    ///
417    /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
418    ///
419    /// while let Some(event) = parser.next_event().await? {
420    ///     match event {
421    ///         NodeEvent::Node(node) => println!("Node: {}", node.id),
422    ///         NodeEvent::ListStart { type_name, .. } => println!("List: {}", type_name),
423    ///         _ => {}
424    ///     }
425    /// }
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
430        // If errored, stop immediately without finalize
431        if self.errored {
432            return Ok(None);
433        }
434
435        // Drain pending events from inline children first
436        if !self.state.pending_events.is_empty() {
437            return Ok(Some(self.state.pending_events.remove(0)));
438        }
439
440        // If finished, continue emitting remaining context ends until stack is empty
441        if self.finished {
442            return self.finalize();
443        }
444
445        loop {
446            // Check timeout periodically (every 100 operations to minimize overhead)
447            self.operations_count += 1;
448            if self.operations_count % 100 == 0 {
449                if let Err(e) = self.check_timeout() {
450                    return self.return_error(e);
451                }
452            }
453
454            let (line_num, line) = match self.reader.next_line().await {
455                Ok(Some(l)) => l,
456                Ok(None) => {
457                    self.finished = true;
458                    return self.finalize();
459                }
460                Err(e) => return self.return_error(e),
461            };
462
463            let trimmed = line.trim();
464
465            // Skip blank lines and comments
466            if trimmed.is_empty() || trimmed.starts_with('#') {
467                continue;
468            }
469
470            // Calculate indentation
471            let indent_info = match hedl_core::lex::calculate_indent(&line, line_num as u32) {
472                Ok(info) => info,
473                Err(e) => return self.return_error(StreamError::syntax(line_num, e.to_string())),
474            };
475
476            let (indent, content) = match indent_info {
477                Some(info) => (info.level, &line[info.spaces..]),
478                None => continue,
479            };
480
481            if indent > self.config.max_indent_depth {
482                return self.return_error(StreamError::syntax(
483                    line_num,
484                    format!("indent depth {indent} exceeds limit"),
485                ));
486            }
487
488            // Pop contexts as needed based on indentation
489            let events = match self.pop_contexts(indent) {
490                Ok(e) => e,
491                Err(e) => return self.return_error(e),
492            };
493            if let Some(event) = events {
494                // Push back the current line to process after emitting list end
495                self.reader.push_back(line_num, line);
496                return Ok(Some(event));
497            }
498
499            // Parse line content
500            return match self.parse_line(content, indent, line_num) {
501                Ok(result) => Ok(result),
502                Err(e) => self.return_error(e),
503            };
504        }
505    }
506
507    fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
508        // If we already sent EndOfDocument, return None to signal true end of stream
509        if self.sent_end_of_document {
510            return Ok(None);
511        }
512
513        while self.state.stack.len() > 1 {
514            let ctx = self.state.stack.pop().expect("stack has elements");
515            match ctx {
516                Context::List {
517                    key,
518                    type_name,
519                    count,
520                    ..
521                } => {
522                    return Ok(Some(NodeEvent::ListEnd {
523                        key,
524                        type_name,
525                        count,
526                    }));
527                }
528                Context::Object { key, .. } => {
529                    return Ok(Some(NodeEvent::ObjectEnd { key }));
530                }
531                Context::Root => {
532                    // Root context handled by the while condition
533                }
534            }
535        }
536
537        // Mark that we've sent EndOfDocument, so subsequent calls return None
538        self.sent_end_of_document = true;
539        Ok(Some(NodeEvent::EndOfDocument))
540    }
541
542    /// Read up to `n` events in a single async operation.
543    ///
544    /// Reduces await overhead for high-throughput scenarios by batching event reads.
545    /// This can improve performance when processing many small events.
546    ///
547    /// # Parameters
548    ///
549    /// - `n`: Maximum number of events to read
550    ///
551    /// # Returns
552    ///
553    /// - `Ok(Vec<NodeEvent>)`: Vector of events (may be fewer than `n` if EOF reached)
554    /// - `Err(e)`: Parsing error encountered
555    ///
556    /// # Examples
557    ///
558    /// ```rust
559    /// # #[cfg(feature = "async")]
560    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
561    /// use hedl_stream::AsyncStreamingParser;
562    /// use tokio::fs::File;
563    ///
564    /// let file = File::open("data.hedl").await?;
565    /// let mut parser = AsyncStreamingParser::new(file).await?;
566    ///
567    /// // Read events in batches of 100
568    /// loop {
569    ///     let batch = parser.next_batch(100).await?;
570    ///     if batch.is_empty() {
571    ///         break;
572    ///     }
573    ///
574    ///     // Process batch
575    ///     for event in batch {
576    ///         // ...
577    ///     }
578    /// }
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>> {
583        let mut batch = Vec::with_capacity(n.min(100)); // Cap initial allocation
584        for _ in 0..n {
585            match self.next_event().await? {
586                Some(NodeEvent::EndOfDocument) => break,
587                Some(event) => batch.push(event),
588                None => break,
589            }
590        }
591        Ok(batch)
592    }
593
594    /// Read events with cancellation support via tokio watch channel.
595    ///
596    /// Returns `Ok(None)` if cancelled, otherwise behaves like `next_event()`.
597    ///
598    /// # Examples
599    ///
600    /// ```rust
601    /// # #[cfg(feature = "async")]
602    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
603    /// use hedl_stream::AsyncStreamingParser;
604    /// use tokio::sync::watch;
605    /// use std::io::Cursor;
606    ///
607    /// let input = r#"
608    /// %VERSION: 1.0
609    /// %STRUCT: User: [id, name]
610    /// ---
611    /// users:@User
612    ///  | alice, Alice
613    /// "#;
614    ///
615    /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
616    /// let (cancel_tx, mut cancel_rx) = watch::channel(false);
617    ///
618    /// // Can cancel from another task
619    /// tokio::spawn(async move {
620    ///     tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
621    ///     let _ = cancel_tx.send(true);
622    /// });
623    ///
624    /// while let Some(event) = parser.next_event_cancellable(&mut cancel_rx).await? {
625    ///     // Process event
626    ///     # break;
627    /// }
628    /// # Ok(())
629    /// # }
630    /// ```
631    #[cfg(feature = "async")]
632    pub async fn next_event_cancellable(
633        &mut self,
634        cancel_rx: &mut tokio::sync::watch::Receiver<bool>,
635    ) -> StreamResult<Option<NodeEvent>> {
636        // Check if cancelled
637        if *cancel_rx.borrow() {
638            return Ok(None);
639        }
640
641        tokio::select! {
642            result = self.next_event() => result,
643            _ = cancel_rx.changed() => {
644                if *cancel_rx.borrow() {
645                    Ok(None)
646                } else {
647                    // False alarm, continue
648                    self.next_event().await
649                }
650            }
651        }
652    }
653}
654
655// Stream trait implementation for futures ecosystem integration
656#[cfg(feature = "async")]
657impl<R: AsyncRead + Unpin> futures_core::Stream for AsyncStreamingParser<R> {
658    type Item = StreamResult<NodeEvent>;
659
660    fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
661        // Create a future from next_event and poll it
662        let fut = self.next_event();
663        tokio::pin!(fut);
664
665        match fut.poll(cx) {
666            Poll::Ready(Ok(Some(NodeEvent::EndOfDocument))) => Poll::Ready(None),
667            Poll::Ready(Ok(Some(event))) => Poll::Ready(Some(Ok(event))),
668            Poll::Ready(Ok(None)) => Poll::Ready(None),
669            // Note: errored flag is set inside next_event before returning errors
670            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
671            Poll::Pending => Poll::Pending,
672        }
673    }
674}
675
676#[cfg(all(test, feature = "async"))]
677mod tests {
678    use super::*;
679    use std::io::Cursor;
680    use std::time::Duration;
681
682    #[tokio::test]
683    async fn test_parse_header() {
684        let input = r#"
685%VERSION: 1.0
686%STRUCT: User: [id, name, email]
687%ALIAS active = "Active"
688%NEST: User > Order
689---
690"#;
691        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
692        let header = parser.header().unwrap();
693
694        assert_eq!(header.version, (1, 0));
695        assert!(header.structs.contains_key("User"));
696        assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
697        assert_eq!(header.nests.get("User"), Some(&vec!["Order".to_string()]));
698    }
699
700    #[tokio::test]
701    async fn test_streaming_nodes() {
702        let input = r"
703%VERSION: 1.0
704%STRUCT: User: [id, name]
705---
706users:@User
707 | alice, Alice Smith
708 | bob, Bob Jones
709";
710        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
711
712        let mut events = Vec::new();
713        while let Some(event) = parser.next_event().await.unwrap() {
714            events.push(event);
715        }
716
717        let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
718        assert_eq!(nodes.len(), 2);
719        assert_eq!(nodes[0].id, "alice");
720        assert_eq!(nodes[1].id, "bob");
721    }
722
723    #[tokio::test]
724    async fn test_timeout() {
725        // Test that parsing completes successfully with a reasonable timeout.
726        // Using 100ms to avoid flakiness on slow systems while still testing timeout config.
727        let config = StreamingParserConfig {
728            timeout: Some(Duration::from_millis(100)),
729            ..Default::default()
730        };
731
732        let input = r"
733%VERSION: 1.0
734---
735";
736        let parser = AsyncStreamingParser::with_config(Cursor::new(input), config).await;
737        assert!(parser.is_ok()); // Header should parse within timeout
738    }
739
740    #[tokio::test]
741    async fn test_inline_schema() {
742        let input = r"
743%VERSION: 1.0
744---
745items:@Item[id, name]
746 | item1, First
747 | item2, Second
748";
749        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
750
751        let mut nodes = Vec::new();
752        while let Some(event) = parser.next_event().await.unwrap() {
753            if let NodeEvent::Node(node) = event {
754                nodes.push(node);
755            }
756        }
757
758        assert_eq!(nodes.len(), 2);
759        assert_eq!(nodes[0].type_name, "Item");
760    }
761
762    #[tokio::test]
763    async fn test_error_handling() {
764        let input = r"
765%VERSION: 1.0
766---
767invalid line without colon
768";
769        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
770
771        let result = parser.next_event().await;
772        assert!(result.is_err());
773        assert!(matches!(result.unwrap_err(), StreamError::Syntax { .. }));
774    }
775
776    #[tokio::test]
777    async fn test_unicode() {
778        let input = r"
779%VERSION: 1.0
780%STRUCT: User: [id, name]
781---
782users:@User
783 | 用户1, 张三
784 | пользователь, Иван
785";
786        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
787
788        let mut nodes = Vec::new();
789        while let Some(event) = parser.next_event().await.unwrap() {
790            if let NodeEvent::Node(node) = event {
791                nodes.push(node);
792            }
793        }
794
795        assert_eq!(nodes.len(), 2);
796        assert_eq!(nodes[0].id, "用户1");
797        assert_eq!(nodes[1].id, "пользователь");
798    }
799
800    // ============ STREAM TRAIT TESTS ============
801
802    #[tokio::test]
803    async fn test_stream_trait_basic() {
804        use futures::StreamExt;
805
806        let input = r"
807%VERSION: 1.0
808%STRUCT: User: [id, name]
809---
810users:@User
811 | alice, Alice
812 | bob, Bob
813";
814        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
815
816        let events: Vec<_> = parser.collect().await;
817        assert!(events.iter().all(std::result::Result::is_ok));
818
819        let nodes: Vec<_> = events
820            .iter()
821            .filter_map(|e| e.as_ref().ok())
822            .filter_map(|e| e.as_node())
823            .collect();
824
825        assert_eq!(nodes.len(), 2);
826        assert_eq!(nodes[0].id, "alice");
827        assert_eq!(nodes[1].id, "bob");
828    }
829
830    #[tokio::test]
831    async fn test_stream_trait_filter_map() {
832        use futures::StreamExt;
833
834        let input = r"
835%VERSION: 1.0
836%STRUCT: User: [id, name, active]
837---
838users:@User
839 | alice, Alice, true
840 | bob, Bob, false
841 | carol, Carol, true
842";
843        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
844
845        // Only collect active users using stream combinators
846        let active_nodes: Vec<_> = parser
847            .filter_map(|result| async move {
848                result.ok().and_then(|event| {
849                    if let NodeEvent::Node(node) = event {
850                        Some(node)
851                    } else {
852                        None
853                    }
854                })
855            })
856            .filter(|node| {
857                let is_active = matches!(node.get_field(2), Some(Value::Bool(true)));
858                async move { is_active }
859            })
860            .collect()
861            .await;
862
863        assert_eq!(active_nodes.len(), 2);
864        assert_eq!(active_nodes[0].id, "alice");
865        assert_eq!(active_nodes[1].id, "carol");
866    }
867
868    #[tokio::test]
869    async fn test_stream_trait_take() {
870        use futures::StreamExt;
871
872        let input = r"
873%VERSION: 1.0
874%STRUCT: User: [id, name]
875---
876users:@User
877 | alice, Alice
878 | bob, Bob
879 | carol, Carol
880 | dave, Dave
881";
882        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
883
884        // Take only first 2 node events
885        let nodes: Vec<_> = parser
886            .filter_map(|result| async move {
887                result.ok().and_then(|event| {
888                    if let NodeEvent::Node(node) = event {
889                        Some(node)
890                    } else {
891                        None
892                    }
893                })
894            })
895            .take(2)
896            .collect()
897            .await;
898
899        assert_eq!(nodes.len(), 2);
900        assert_eq!(nodes[0].id, "alice");
901        assert_eq!(nodes[1].id, "bob");
902    }
903
904    #[tokio::test]
905    async fn test_stream_trait_count() {
906        use futures::StreamExt;
907
908        let input = r"
909%VERSION: 1.0
910%STRUCT: User: [id, name]
911---
912users:@User
913 | alice, Alice
914 | bob, Bob
915 | carol, Carol
916";
917        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
918
919        let total = parser.count().await;
920        // Should count all events: ListStart, 3 Nodes, ListEnd = 5 events
921        assert_eq!(total, 5);
922    }
923
924    // ============ BATCH READING TESTS ============
925
926    #[tokio::test]
927    async fn test_next_batch_basic() {
928        let input = r"
929%VERSION: 1.0
930%STRUCT: User: [id, name]
931---
932users:@User
933 | alice, Alice
934 | bob, Bob
935 | carol, Carol
936";
937        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
938
939        // Read all events in one batch
940        let batch = parser.next_batch(10).await.unwrap();
941        assert_eq!(batch.len(), 5); // ListStart, 3 Nodes, ListEnd
942
943        // Next batch should be empty (EOF)
944        let batch = parser.next_batch(10).await.unwrap();
945        assert!(batch.is_empty());
946    }
947
948    #[tokio::test]
949    async fn test_next_batch_incremental() {
950        let input = r"
951%VERSION: 1.0
952%STRUCT: User: [id, name]
953---
954users:@User
955 | alice, Alice
956 | bob, Bob
957 | carol, Carol
958";
959        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
960
961        // Read in small batches
962        let batch1 = parser.next_batch(2).await.unwrap();
963        assert_eq!(batch1.len(), 2); // ListStart, Node
964
965        let batch2 = parser.next_batch(2).await.unwrap();
966        assert_eq!(batch2.len(), 2); // Node, Node
967
968        let batch3 = parser.next_batch(2).await.unwrap();
969        assert_eq!(batch3.len(), 1); // ListEnd
970
971        let batch4 = parser.next_batch(2).await.unwrap();
972        assert!(batch4.is_empty());
973    }
974
975    #[tokio::test]
976    async fn test_next_batch_empty_file() {
977        let input = r"
978%VERSION: 1.0
979%STRUCT: User: [id, name]
980---
981";
982        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
983
984        let batch = parser.next_batch(10).await.unwrap();
985        assert!(batch.is_empty());
986    }
987
988    #[tokio::test]
989    async fn test_next_batch_large() {
990        let mut input = String::from(
991            r"
992%VERSION: 1.0
993%STRUCT: Data: [id, value]
994---
995data:@Data
996",
997        );
998        for i in 0..500 {
999            input.push_str(&format!(" | row{i}, value{i}\n"));
1000        }
1001
1002        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1003
1004        // Read in large batches
1005        let batch1 = parser.next_batch(100).await.unwrap();
1006        assert_eq!(batch1.len(), 100); // ListStart + 99 Nodes
1007
1008        let batch2 = parser.next_batch(100).await.unwrap();
1009        assert_eq!(batch2.len(), 100); // 100 Nodes
1010
1011        // Continue until we get all events
1012        let mut total = batch1.len() + batch2.len();
1013        loop {
1014            let batch = parser.next_batch(100).await.unwrap();
1015            if batch.is_empty() {
1016                break;
1017            }
1018            total += batch.len();
1019        }
1020
1021        // Total: ListStart + 500 Nodes + ListEnd = 502
1022        assert_eq!(total, 502);
1023    }
1024
1025    // ============ CANCELLATION TESTS ============
1026
1027    #[tokio::test]
1028    async fn test_cancellation_basic() {
1029        use tokio::sync::watch;
1030
1031        let input = r"
1032%VERSION: 1.0
1033%STRUCT: User: [id, name]
1034---
1035users:@User
1036 | alice, Alice
1037 | bob, Bob
1038";
1039        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1040
1041        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1042
1043        // Read first event normally
1044        let event1 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1045        assert!(event1.is_some());
1046
1047        // Cancel
1048        cancel_tx.send(true).unwrap();
1049
1050        // Next read should return None (cancelled)
1051        let event2 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1052        assert!(event2.is_none());
1053    }
1054
1055    #[tokio::test]
1056    async fn test_cancellation_not_cancelled() {
1057        use tokio::sync::watch;
1058
1059        let input = r"
1060%VERSION: 1.0
1061%STRUCT: User: [id, name]
1062---
1063users:@User
1064 | alice, Alice
1065";
1066        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1067
1068        let (_cancel_tx, mut cancel_rx) = watch::channel(false);
1069
1070        // Read all events without cancellation
1071        let mut count = 0;
1072        while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1073            count += 1;
1074        }
1075
1076        // Should have read all events: ListStart, Node, ListEnd, EndOfDocument
1077        assert_eq!(count, 4);
1078    }
1079
1080    #[tokio::test]
1081    async fn test_cancellation_during_processing() {
1082        use tokio::sync::watch;
1083
1084        let mut input = String::from(
1085            r"
1086%VERSION: 1.0
1087%STRUCT: Data: [id]
1088---
1089data:@Data
1090",
1091        );
1092        for i in 0..1000 {
1093            input.push_str(&format!(" | row{i}\n"));
1094        }
1095
1096        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1097
1098        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1099
1100        // Spawn a task that cancels after reading some events
1101        tokio::spawn(async move {
1102            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1103            cancel_tx.send(true).unwrap();
1104        });
1105
1106        let mut count = 0;
1107        while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1108            count += 1;
1109            // Small delay to allow cancellation to trigger
1110            tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
1111        }
1112
1113        // Should have read some events but not all 1002 (ListStart + 1000 Nodes + ListEnd)
1114        assert!(count < 1002);
1115        assert!(count > 0);
1116    }
1117
1118    // ============ CONCURRENT PROCESSING TESTS ============
1119
1120    #[tokio::test]
1121    async fn test_concurrent_file_processing() {
1122        let input = r"
1123%VERSION: 1.0
1124%STRUCT: User: [id, name]
1125---
1126users:@User
1127 | alice, Alice
1128 | bob, Bob
1129";
1130
1131        // Process multiple identical streams concurrently
1132        let tasks: Vec<_> = (0..5)
1133            .map(|_| {
1134                let input_clone = input.to_string();
1135                tokio::spawn(async move {
1136                    let mut parser = AsyncStreamingParser::new(Cursor::new(input_clone))
1137                        .await
1138                        .unwrap();
1139
1140                    let mut count = 0;
1141                    while let Some(_event) = parser.next_event().await.unwrap() {
1142                        count += 1;
1143                    }
1144                    count
1145                })
1146            })
1147            .collect();
1148
1149        let results = futures::future::join_all(tasks).await;
1150
1151        // All tasks should succeed and count the same number of events
1152        for result in results {
1153            assert_eq!(result.unwrap(), 5); // ListStart, 2 Nodes, ListEnd, EndOfDocument
1154        }
1155    }
1156
1157    #[tokio::test]
1158    async fn test_concurrent_with_stream_trait() {
1159        use futures::StreamExt;
1160
1161        let input = r"
1162%VERSION: 1.0
1163%STRUCT: Data: [id]
1164---
1165data:@Data
1166 | row1
1167 | row2
1168 | row3
1169";
1170
1171        // Process multiple streams concurrently using manual await
1172        // Note: futures::Stream combinators create !Send futures, so we can't use tokio::spawn
1173        let mut counts = Vec::new();
1174
1175        for _ in 0..10 {
1176            let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1177
1178            // Count nodes using stream combinators
1179            let count = parser
1180                .filter_map(|result| async move {
1181                    result.ok().and_then(|event| {
1182                        if let NodeEvent::Node(_) = event {
1183                            Some(())
1184                        } else {
1185                            None
1186                        }
1187                    })
1188                })
1189                .count()
1190                .await;
1191
1192            counts.push(count);
1193        }
1194
1195        // All should count 3 nodes
1196        for count in counts {
1197            assert_eq!(count, 3);
1198        }
1199    }
1200
1201    // ============ EDGE CASE AND INTEGRATION TESTS ============
1202
1203    #[tokio::test]
1204    async fn test_stream_trait_with_errors() {
1205        use futures::StreamExt;
1206
1207        let input = r"
1208%VERSION: 1.0
1209%STRUCT: User: [id, name]
1210---
1211users:@User
1212 | alice, Alice
1213 | bob
1214 | carol, Carol
1215";
1216        let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1217
1218        let results: Vec<_> = parser.collect().await;
1219
1220        // Should have error for malformed row (bob with only 1 field)
1221        let errors: Vec<_> = results.iter().filter(|r| r.is_err()).collect();
1222        assert!(!errors.is_empty());
1223    }
1224
1225    #[tokio::test]
1226    async fn test_batch_with_mixed_events() {
1227        let input = r"
1228%VERSION: 1.0
1229%STRUCT: User: [id, name]
1230%STRUCT: Product: [id, title]
1231---
1232users:@User
1233 | alice, Alice
1234products:@Product
1235 | prod1, Widget
1236";
1237        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1238
1239        let batch = parser.next_batch(10).await.unwrap();
1240
1241        // Should contain: ListStart(User), Node(alice), ListEnd(User), ListStart(Product), Node(prod1), ListEnd(Product)
1242        assert_eq!(batch.len(), 6);
1243
1244        let list_starts: Vec<_> = batch
1245            .iter()
1246            .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
1247            .collect();
1248        assert_eq!(list_starts.len(), 2);
1249    }
1250
1251    #[tokio::test]
1252    async fn test_stream_empty_after_cancellation() {
1253        use tokio::sync::watch;
1254
1255        let input = r"
1256%VERSION: 1.0
1257%STRUCT: User: [id, name]
1258---
1259users:@User
1260 | alice, Alice
1261 | bob, Bob
1262";
1263        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1264
1265        let (cancel_tx, mut cancel_rx) = watch::channel(false);
1266
1267        // Read one event
1268        let _event = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1269
1270        // Cancel
1271        cancel_tx.send(true).unwrap();
1272
1273        // Subsequent reads should return None
1274        assert!(parser
1275            .next_event_cancellable(&mut cancel_rx)
1276            .await
1277            .unwrap()
1278            .is_none());
1279        assert!(parser
1280            .next_event_cancellable(&mut cancel_rx)
1281            .await
1282            .unwrap()
1283            .is_none());
1284    }
1285
1286    #[tokio::test]
1287    async fn test_batch_reading_performance() {
1288        // Create a large dataset
1289        let mut input = String::from(
1290            r"
1291%VERSION: 1.0
1292%STRUCT: Data: [id, value]
1293---
1294data:@Data
1295",
1296        );
1297        for i in 0..1000 {
1298            input.push_str(&format!(" | row{i}, value{i}\n"));
1299        }
1300
1301        let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1302
1303        let start = std::time::Instant::now();
1304
1305        // Read in batches
1306        let mut total = 0;
1307        loop {
1308            let batch = parser.next_batch(100).await.unwrap();
1309            if batch.is_empty() {
1310                break;
1311            }
1312            total += batch.len();
1313        }
1314
1315        let elapsed = start.elapsed();
1316
1317        // Should have read all events
1318        assert_eq!(total, 1002); // ListStart + 1000 Nodes + ListEnd
1319
1320        // Should complete reasonably quickly (< 100ms for 1000 rows)
1321        assert!(elapsed.as_millis() < 100);
1322    }
1323}