hedl_stream/async_parser/mod.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async streaming parser implementation.
19//!
20//! This module provides an asynchronous streaming parser for HEDL documents that mirrors
21//! the synchronous [`StreamingParser`](crate::StreamingParser) but uses tokio's async I/O.
22//!
23//! # When to Use Async
24//!
25//! **Choose Async (`AsyncStreamingParser`) when:**
26//! - Parsing network streams or remote data sources
27//! - High-concurrency scenarios (thousands of concurrent parsers)
28//! - Integration with async web frameworks (axum, actix-web, etc.)
29//! - Need to parse multiple streams concurrently
30//! - Working in an async runtime context
31//!
32//! **Choose Sync (`StreamingParser`) when:**
33//! - Parsing local files
34//! - Single-threaded batch processing
35//! - Simpler synchronous code is preferred
36//! - Performance is critical and no I/O waiting occurs
37//!
38//! # Performance Characteristics
39//!
40//! - **Non-blocking I/O**: Yields to runtime when waiting for data
41//! - **Same Memory Profile**: Identical to sync parser (~constant memory)
42//! - **Concurrent Processing**: Can process many streams simultaneously
43//! - **Zero-Copy**: Minimal allocations, same as sync version
44//!
45//! # Examples
46//!
47//! ## Basic Async Streaming
48//!
49//! ```rust,no_run
50//! # #[cfg(feature = "async")]
51//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
53//! use tokio::fs::File;
54//!
55//! let file = File::open("large-dataset.hedl").await?;
56//! let mut parser = AsyncStreamingParser::new(file).await?;
57//!
58//! while let Some(event) = parser.next_event().await? {
59//! match event {
60//! NodeEvent::Node(node) => {
61//! println!("{}:{}", node.type_name, node.id);
62//! }
63//! NodeEvent::ListStart { type_name, .. } => {
64//! println!("List started: {}", type_name);
65//! }
66//! _ => {}
67//! }
68//! }
69//! # Ok(())
70//! # }
71//! ```
72//!
73//! ## Concurrent Processing
74//!
75//! ```rust,no_run
76//! # #[cfg(feature = "async")]
77//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
78//! use hedl_stream::{AsyncStreamingParser, NodeEvent};
79//! use tokio::fs::File;
80//!
81//! async fn process_file(path: &str) -> Result<usize, Box<dyn std::error::Error>> {
82//! let file = File::open(path).await?;
83//! let mut parser = AsyncStreamingParser::new(file).await?;
84//!
85//! let mut count = 0;
86//! while let Some(event) = parser.next_event().await? {
87//! if let NodeEvent::Node(_) = event {
88//! count += 1;
89//! }
90//! }
91//! Ok(count)
92//! }
93//!
94//! // Process multiple files concurrently
95//! let results = tokio::join!(
96//! process_file("file1.hedl"),
97//! process_file("file2.hedl"),
98//! process_file("file3.hedl"),
99//! );
100//! # Ok(())
101//! # }
102//! ```
103
104mod header_parsing;
105mod line_parsing;
106
107use crate::async_reader::AsyncLineReader;
108use crate::error::{StreamError, StreamResult};
109use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
110use crate::parser::StreamingParserConfig;
111use hedl_core::Value;
112use std::future::Future;
113use std::pin::Pin;
114use std::task::{Context as TaskContext, Poll};
115use std::time::Instant;
116use tokio::io::AsyncRead;
117
118/// Type alias for list context lookup result: (`type_name`, schema, optional `last_node` info)
119type ListContextResult = (String, Vec<String>, Option<(String, String)>);
120
121/// Async streaming HEDL parser.
122///
123/// Processes HEDL documents asynchronously, yielding `NodeEvent` items as they
124/// are parsed without loading the entire document into memory. Uses tokio's
125/// async I/O for non-blocking operation.
126///
127/// # Memory Characteristics
128///
129/// - **Header**: Parsed once at initialization and kept in memory
130/// - **Per-Line**: Only current line and parsing context (stack depth proportional to nesting)
131/// - **No Buffering**: Nodes are yielded immediately after parsing
132/// - **Identical to Sync**: Same memory profile as synchronous parser
133///
134/// # Examples
135///
136/// ## Parse from Async File
137///
138/// ```rust,no_run
139/// # #[cfg(feature = "async")]
140/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
141/// use hedl_stream::{AsyncStreamingParser, NodeEvent};
142/// use tokio::fs::File;
143///
144/// let file = File::open("data.hedl").await?;
145/// let mut parser = AsyncStreamingParser::new(file).await?;
146///
147/// while let Some(event) = parser.next_event().await? {
148/// if let NodeEvent::Node(node) = event {
149/// println!("Processing {}: {}", node.type_name, node.id);
150/// }
151/// }
152/// # Ok(())
153/// # }
154/// ```
155///
156/// ## With Timeout Protection
157///
158/// ```rust,no_run
159/// # #[cfg(feature = "async")]
160/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
161/// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig, StreamError};
162/// use std::time::Duration;
163/// use std::io::Cursor;
164///
165/// let config = StreamingParserConfig {
166/// timeout: Some(Duration::from_secs(10)),
167/// ..Default::default()
168/// };
169///
170/// let mut parser = AsyncStreamingParser::with_config(
171/// Cursor::new("untrusted input"),
172/// config
173/// ).await?;
174///
175/// while let Some(event) = parser.next_event().await? {
176/// // Process event
177/// }
178/// # Ok(())
179/// # }
180/// ```
181pub struct AsyncStreamingParser<R: AsyncRead + Unpin> {
182 reader: AsyncLineReader<R>,
183 config: StreamingParserConfig,
184 header: Option<HeaderInfo>,
185 state: ParserState,
186 finished: bool,
187 errored: bool, // Track if an error occurred to skip finalize
188 sent_end_of_document: bool, // Track if EndOfDocument has been returned
189 start_time: Instant,
190 operations_count: usize,
191}
192
193#[derive(Debug)]
194struct ParserState {
195 /// Stack of active contexts.
196 stack: Vec<Context>,
197 /// Previous row values for ditto handling (deprecated in v2.0+).
198 prev_row: Option<Vec<Value>>,
199 /// Pending events from inline children parsing.
200 pending_events: Vec<NodeEvent>,
201}
202
203#[derive(Debug, Clone)]
204enum Context {
205 Root,
206 Object {
207 key: String,
208 indent: usize,
209 },
210 List {
211 key: String,
212 type_name: String,
213 schema: Vec<String>,
214 row_indent: usize,
215 count: usize,
216 last_node: Option<(String, String)>, // (type, id)
217 },
218}
219
220impl<R: AsyncRead + Unpin> AsyncStreamingParser<R> {
221 /// Create a new async streaming parser with default configuration.
222 ///
223 /// The parser immediately reads and validates the HEDL header (version and
224 /// schema directives). If the header is invalid, this function returns an error.
225 ///
226 /// # Parameters
227 ///
228 /// - `reader`: Any type implementing `AsyncRead + Unpin`
229 ///
230 /// # Returns
231 ///
232 /// - `Ok(parser)`: Parser ready to yield events
233 /// - `Err(e)`: Header parsing failed (missing version, invalid schema, etc.)
234 ///
235 /// # Examples
236 ///
237 /// ## From a File
238 ///
239 /// ```rust,no_run
240 /// # #[cfg(feature = "async")]
241 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
242 /// use hedl_stream::AsyncStreamingParser;
243 /// use tokio::fs::File;
244 ///
245 /// let file = File::open("data.hedl").await?;
246 /// let parser = AsyncStreamingParser::new(file).await?;
247 /// # Ok(())
248 /// # }
249 /// ```
250 ///
251 /// ## From a String
252 ///
253 /// ```rust
254 /// # #[cfg(feature = "async")]
255 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256 /// use hedl_stream::AsyncStreamingParser;
257 /// use std::io::Cursor;
258 ///
259 /// let data = r#"
260 /// %VERSION: 1.0
261 /// %STRUCT: User: [id, name]
262 /// ---
263 /// users:@User
264 /// | alice, Alice
265 /// "#;
266 ///
267 /// let parser = AsyncStreamingParser::new(Cursor::new(data)).await?;
268 /// # Ok(())
269 /// # }
270 /// ```
271 ///
272 /// # Errors
273 ///
274 /// - `StreamError::MissingVersion`: No `%VERSION` directive found
275 /// - `StreamError::InvalidVersion`: Invalid version format
276 /// - `StreamError::Syntax`: Malformed header directive
277 /// - `StreamError::Io`: I/O error reading input
278 pub async fn new(reader: R) -> StreamResult<Self> {
279 Self::with_config(reader, StreamingParserConfig::default()).await
280 }
281
282 /// Create an async streaming parser with custom configuration.
283 ///
284 /// Use this when you need to control memory limits, buffer sizes, or enable
285 /// timeout protection for untrusted input.
286 ///
287 /// # Examples
288 ///
289 /// ## With Timeout
290 ///
291 /// ```rust
292 /// # #[cfg(feature = "async")]
293 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
294 /// use hedl_stream::{AsyncStreamingParser, StreamingParserConfig};
295 /// use std::time::Duration;
296 /// use std::io::Cursor;
297 ///
298 /// let config = StreamingParserConfig {
299 /// timeout: Some(Duration::from_secs(30)),
300 /// ..Default::default()
301 /// };
302 ///
303 /// let parser = AsyncStreamingParser::with_config(
304 /// Cursor::new("untrusted input"),
305 /// config
306 /// ).await?;
307 /// # Ok(())
308 /// # }
309 /// ```
310 pub async fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
311 let mut parser = Self {
312 reader: AsyncLineReader::with_capacity(reader, config.buffer_size),
313 config,
314 header: None,
315 state: ParserState {
316 stack: vec![Context::Root],
317 prev_row: None,
318 pending_events: Vec::new(),
319 },
320 finished: false,
321 errored: false,
322 sent_end_of_document: false,
323 start_time: Instant::now(),
324 operations_count: 0,
325 };
326
327 // Parse header immediately
328 parser.parse_header().await?;
329
330 Ok(parser)
331 }
332
333 /// Check if timeout has been exceeded.
334 #[inline]
335 fn check_timeout(&self) -> StreamResult<()> {
336 if let Some(timeout) = self.config.timeout {
337 let elapsed = self.start_time.elapsed();
338 if elapsed > timeout {
339 return Err(StreamError::Timeout {
340 elapsed,
341 limit: timeout,
342 });
343 }
344 }
345 Ok(())
346 }
347
348 /// Set the errored flag and return an error.
349 ///
350 /// This helper ensures that after any error is returned, subsequent calls
351 /// to `next_event` will return `Ok(None)` without attempting further parsing.
352 #[inline]
353 fn return_error<T>(&mut self, e: StreamError) -> StreamResult<T> {
354 self.finished = true;
355 self.errored = true;
356 Err(e)
357 }
358
359 /// Get the parsed header information.
360 ///
361 /// Returns header metadata including version, schema definitions, aliases,
362 /// and nesting rules. This is available immediately after parser creation.
363 ///
364 /// # Examples
365 ///
366 /// ```rust
367 /// # #[cfg(feature = "async")]
368 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
369 /// use hedl_stream::AsyncStreamingParser;
370 /// use std::io::Cursor;
371 ///
372 /// let input = r#"
373 /// %VERSION: 1.0
374 /// %STRUCT: User: [id, name, email]
375 /// ---
376 /// "#;
377 ///
378 /// let parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
379 /// let header = parser.header().unwrap();
380 ///
381 /// assert_eq!(header.version, (1, 0));
382 /// let user_schema = header.get_schema("User").unwrap();
383 /// assert_eq!(user_schema, &vec!["id", "name", "email"]);
384 /// # Ok(())
385 /// # }
386 /// ```
387 pub fn header(&self) -> Option<&HeaderInfo> {
388 self.header.as_ref()
389 }
390
391 /// Parse the next event from the stream asynchronously.
392 ///
393 /// Returns `Ok(Some(event))` if an event was parsed, `Ok(None)` at end of document,
394 /// or `Err` on parsing errors.
395 ///
396 /// # Performance
397 ///
398 /// This method is async and will yield to the tokio runtime when waiting for I/O,
399 /// allowing other tasks to run. It does not block the thread.
400 ///
401 /// # Examples
402 ///
403 /// ```rust
404 /// # #[cfg(feature = "async")]
405 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
406 /// use hedl_stream::{AsyncStreamingParser, NodeEvent};
407 /// use std::io::Cursor;
408 ///
409 /// let input = r#"
410 /// %VERSION: 1.0
411 /// %STRUCT: User: [id, name]
412 /// ---
413 /// users:@User
414 /// | alice, Alice
415 /// "#;
416 ///
417 /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
418 ///
419 /// while let Some(event) = parser.next_event().await? {
420 /// match event {
421 /// NodeEvent::Node(node) => println!("Node: {}", node.id),
422 /// NodeEvent::ListStart { type_name, .. } => println!("List: {}", type_name),
423 /// _ => {}
424 /// }
425 /// }
426 /// # Ok(())
427 /// # }
428 /// ```
429 pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
430 // If errored, stop immediately without finalize
431 if self.errored {
432 return Ok(None);
433 }
434
435 // Drain pending events from inline children first
436 if !self.state.pending_events.is_empty() {
437 return Ok(Some(self.state.pending_events.remove(0)));
438 }
439
440 // If finished, continue emitting remaining context ends until stack is empty
441 if self.finished {
442 return self.finalize();
443 }
444
445 loop {
446 // Check timeout periodically (every 100 operations to minimize overhead)
447 self.operations_count += 1;
448 if self.operations_count % 100 == 0 {
449 if let Err(e) = self.check_timeout() {
450 return self.return_error(e);
451 }
452 }
453
454 let (line_num, line) = match self.reader.next_line().await {
455 Ok(Some(l)) => l,
456 Ok(None) => {
457 self.finished = true;
458 return self.finalize();
459 }
460 Err(e) => return self.return_error(e),
461 };
462
463 let trimmed = line.trim();
464
465 // Skip blank lines and comments
466 if trimmed.is_empty() || trimmed.starts_with('#') {
467 continue;
468 }
469
470 // Calculate indentation
471 let indent_info = match hedl_core::lex::calculate_indent(&line, line_num as u32) {
472 Ok(info) => info,
473 Err(e) => return self.return_error(StreamError::syntax(line_num, e.to_string())),
474 };
475
476 let (indent, content) = match indent_info {
477 Some(info) => (info.level, &line[info.spaces..]),
478 None => continue,
479 };
480
481 if indent > self.config.max_indent_depth {
482 return self.return_error(StreamError::syntax(
483 line_num,
484 format!("indent depth {indent} exceeds limit"),
485 ));
486 }
487
488 // Pop contexts as needed based on indentation
489 let events = match self.pop_contexts(indent) {
490 Ok(e) => e,
491 Err(e) => return self.return_error(e),
492 };
493 if let Some(event) = events {
494 // Push back the current line to process after emitting list end
495 self.reader.push_back(line_num, line);
496 return Ok(Some(event));
497 }
498
499 // Parse line content
500 return match self.parse_line(content, indent, line_num) {
501 Ok(result) => Ok(result),
502 Err(e) => self.return_error(e),
503 };
504 }
505 }
506
507 fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
508 // If we already sent EndOfDocument, return None to signal true end of stream
509 if self.sent_end_of_document {
510 return Ok(None);
511 }
512
513 while self.state.stack.len() > 1 {
514 let ctx = self.state.stack.pop().expect("stack has elements");
515 match ctx {
516 Context::List {
517 key,
518 type_name,
519 count,
520 ..
521 } => {
522 return Ok(Some(NodeEvent::ListEnd {
523 key,
524 type_name,
525 count,
526 }));
527 }
528 Context::Object { key, .. } => {
529 return Ok(Some(NodeEvent::ObjectEnd { key }));
530 }
531 Context::Root => {
532 // Root context handled by the while condition
533 }
534 }
535 }
536
537 // Mark that we've sent EndOfDocument, so subsequent calls return None
538 self.sent_end_of_document = true;
539 Ok(Some(NodeEvent::EndOfDocument))
540 }
541
542 /// Read up to `n` events in a single async operation.
543 ///
544 /// Reduces await overhead for high-throughput scenarios by batching event reads.
545 /// This can improve performance when processing many small events.
546 ///
547 /// # Parameters
548 ///
549 /// - `n`: Maximum number of events to read
550 ///
551 /// # Returns
552 ///
553 /// - `Ok(Vec<NodeEvent>)`: Vector of events (may be fewer than `n` if EOF reached)
554 /// - `Err(e)`: Parsing error encountered
555 ///
556 /// # Examples
557 ///
558 /// ```rust
559 /// # #[cfg(feature = "async")]
560 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
561 /// use hedl_stream::AsyncStreamingParser;
562 /// use tokio::fs::File;
563 ///
564 /// let file = File::open("data.hedl").await?;
565 /// let mut parser = AsyncStreamingParser::new(file).await?;
566 ///
567 /// // Read events in batches of 100
568 /// loop {
569 /// let batch = parser.next_batch(100).await?;
570 /// if batch.is_empty() {
571 /// break;
572 /// }
573 ///
574 /// // Process batch
575 /// for event in batch {
576 /// // ...
577 /// }
578 /// }
579 /// # Ok(())
580 /// # }
581 /// ```
582 pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>> {
583 let mut batch = Vec::with_capacity(n.min(100)); // Cap initial allocation
584 for _ in 0..n {
585 match self.next_event().await? {
586 Some(NodeEvent::EndOfDocument) => break,
587 Some(event) => batch.push(event),
588 None => break,
589 }
590 }
591 Ok(batch)
592 }
593
594 /// Read events with cancellation support via tokio watch channel.
595 ///
596 /// Returns `Ok(None)` if cancelled, otherwise behaves like `next_event()`.
597 ///
598 /// # Examples
599 ///
600 /// ```rust
601 /// # #[cfg(feature = "async")]
602 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
603 /// use hedl_stream::AsyncStreamingParser;
604 /// use tokio::sync::watch;
605 /// use std::io::Cursor;
606 ///
607 /// let input = r#"
608 /// %VERSION: 1.0
609 /// %STRUCT: User: [id, name]
610 /// ---
611 /// users:@User
612 /// | alice, Alice
613 /// "#;
614 ///
615 /// let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
616 /// let (cancel_tx, mut cancel_rx) = watch::channel(false);
617 ///
618 /// // Can cancel from another task
619 /// tokio::spawn(async move {
620 /// tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
621 /// let _ = cancel_tx.send(true);
622 /// });
623 ///
624 /// while let Some(event) = parser.next_event_cancellable(&mut cancel_rx).await? {
625 /// // Process event
626 /// # break;
627 /// }
628 /// # Ok(())
629 /// # }
630 /// ```
631 #[cfg(feature = "async")]
632 pub async fn next_event_cancellable(
633 &mut self,
634 cancel_rx: &mut tokio::sync::watch::Receiver<bool>,
635 ) -> StreamResult<Option<NodeEvent>> {
636 // Check if cancelled
637 if *cancel_rx.borrow() {
638 return Ok(None);
639 }
640
641 tokio::select! {
642 result = self.next_event() => result,
643 _ = cancel_rx.changed() => {
644 if *cancel_rx.borrow() {
645 Ok(None)
646 } else {
647 // False alarm, continue
648 self.next_event().await
649 }
650 }
651 }
652 }
653}
654
655// Stream trait implementation for futures ecosystem integration
656#[cfg(feature = "async")]
657impl<R: AsyncRead + Unpin> futures_core::Stream for AsyncStreamingParser<R> {
658 type Item = StreamResult<NodeEvent>;
659
660 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
661 // Create a future from next_event and poll it
662 let fut = self.next_event();
663 tokio::pin!(fut);
664
665 match fut.poll(cx) {
666 Poll::Ready(Ok(Some(NodeEvent::EndOfDocument))) => Poll::Ready(None),
667 Poll::Ready(Ok(Some(event))) => Poll::Ready(Some(Ok(event))),
668 Poll::Ready(Ok(None)) => Poll::Ready(None),
669 // Note: errored flag is set inside next_event before returning errors
670 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
671 Poll::Pending => Poll::Pending,
672 }
673 }
674}
675
676#[cfg(all(test, feature = "async"))]
677mod tests {
678 use super::*;
679 use std::io::Cursor;
680 use std::time::Duration;
681
682 #[tokio::test]
683 async fn test_parse_header() {
684 let input = r#"
685%VERSION: 1.0
686%STRUCT: User: [id, name, email]
687%ALIAS active = "Active"
688%NEST: User > Order
689---
690"#;
691 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
692 let header = parser.header().unwrap();
693
694 assert_eq!(header.version, (1, 0));
695 assert!(header.structs.contains_key("User"));
696 assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
697 assert_eq!(header.nests.get("User"), Some(&vec!["Order".to_string()]));
698 }
699
700 #[tokio::test]
701 async fn test_streaming_nodes() {
702 let input = r"
703%VERSION: 1.0
704%STRUCT: User: [id, name]
705---
706users:@User
707 | alice, Alice Smith
708 | bob, Bob Jones
709";
710 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
711
712 let mut events = Vec::new();
713 while let Some(event) = parser.next_event().await.unwrap() {
714 events.push(event);
715 }
716
717 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
718 assert_eq!(nodes.len(), 2);
719 assert_eq!(nodes[0].id, "alice");
720 assert_eq!(nodes[1].id, "bob");
721 }
722
723 #[tokio::test]
724 async fn test_timeout() {
725 // Test that parsing completes successfully with a reasonable timeout.
726 // Using 100ms to avoid flakiness on slow systems while still testing timeout config.
727 let config = StreamingParserConfig {
728 timeout: Some(Duration::from_millis(100)),
729 ..Default::default()
730 };
731
732 let input = r"
733%VERSION: 1.0
734---
735";
736 let parser = AsyncStreamingParser::with_config(Cursor::new(input), config).await;
737 assert!(parser.is_ok()); // Header should parse within timeout
738 }
739
740 #[tokio::test]
741 async fn test_inline_schema() {
742 let input = r"
743%VERSION: 1.0
744---
745items:@Item[id, name]
746 | item1, First
747 | item2, Second
748";
749 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
750
751 let mut nodes = Vec::new();
752 while let Some(event) = parser.next_event().await.unwrap() {
753 if let NodeEvent::Node(node) = event {
754 nodes.push(node);
755 }
756 }
757
758 assert_eq!(nodes.len(), 2);
759 assert_eq!(nodes[0].type_name, "Item");
760 }
761
762 #[tokio::test]
763 async fn test_error_handling() {
764 let input = r"
765%VERSION: 1.0
766---
767invalid line without colon
768";
769 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
770
771 let result = parser.next_event().await;
772 assert!(result.is_err());
773 assert!(matches!(result.unwrap_err(), StreamError::Syntax { .. }));
774 }
775
776 #[tokio::test]
777 async fn test_unicode() {
778 let input = r"
779%VERSION: 1.0
780%STRUCT: User: [id, name]
781---
782users:@User
783 | 用户1, 张三
784 | пользователь, Иван
785";
786 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
787
788 let mut nodes = Vec::new();
789 while let Some(event) = parser.next_event().await.unwrap() {
790 if let NodeEvent::Node(node) = event {
791 nodes.push(node);
792 }
793 }
794
795 assert_eq!(nodes.len(), 2);
796 assert_eq!(nodes[0].id, "用户1");
797 assert_eq!(nodes[1].id, "пользователь");
798 }
799
800 // ============ STREAM TRAIT TESTS ============
801
802 #[tokio::test]
803 async fn test_stream_trait_basic() {
804 use futures::StreamExt;
805
806 let input = r"
807%VERSION: 1.0
808%STRUCT: User: [id, name]
809---
810users:@User
811 | alice, Alice
812 | bob, Bob
813";
814 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
815
816 let events: Vec<_> = parser.collect().await;
817 assert!(events.iter().all(std::result::Result::is_ok));
818
819 let nodes: Vec<_> = events
820 .iter()
821 .filter_map(|e| e.as_ref().ok())
822 .filter_map(|e| e.as_node())
823 .collect();
824
825 assert_eq!(nodes.len(), 2);
826 assert_eq!(nodes[0].id, "alice");
827 assert_eq!(nodes[1].id, "bob");
828 }
829
830 #[tokio::test]
831 async fn test_stream_trait_filter_map() {
832 use futures::StreamExt;
833
834 let input = r"
835%VERSION: 1.0
836%STRUCT: User: [id, name, active]
837---
838users:@User
839 | alice, Alice, true
840 | bob, Bob, false
841 | carol, Carol, true
842";
843 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
844
845 // Only collect active users using stream combinators
846 let active_nodes: Vec<_> = parser
847 .filter_map(|result| async move {
848 result.ok().and_then(|event| {
849 if let NodeEvent::Node(node) = event {
850 Some(node)
851 } else {
852 None
853 }
854 })
855 })
856 .filter(|node| {
857 let is_active = matches!(node.get_field(2), Some(Value::Bool(true)));
858 async move { is_active }
859 })
860 .collect()
861 .await;
862
863 assert_eq!(active_nodes.len(), 2);
864 assert_eq!(active_nodes[0].id, "alice");
865 assert_eq!(active_nodes[1].id, "carol");
866 }
867
868 #[tokio::test]
869 async fn test_stream_trait_take() {
870 use futures::StreamExt;
871
872 let input = r"
873%VERSION: 1.0
874%STRUCT: User: [id, name]
875---
876users:@User
877 | alice, Alice
878 | bob, Bob
879 | carol, Carol
880 | dave, Dave
881";
882 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
883
884 // Take only first 2 node events
885 let nodes: Vec<_> = parser
886 .filter_map(|result| async move {
887 result.ok().and_then(|event| {
888 if let NodeEvent::Node(node) = event {
889 Some(node)
890 } else {
891 None
892 }
893 })
894 })
895 .take(2)
896 .collect()
897 .await;
898
899 assert_eq!(nodes.len(), 2);
900 assert_eq!(nodes[0].id, "alice");
901 assert_eq!(nodes[1].id, "bob");
902 }
903
904 #[tokio::test]
905 async fn test_stream_trait_count() {
906 use futures::StreamExt;
907
908 let input = r"
909%VERSION: 1.0
910%STRUCT: User: [id, name]
911---
912users:@User
913 | alice, Alice
914 | bob, Bob
915 | carol, Carol
916";
917 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
918
919 let total = parser.count().await;
920 // Should count all events: ListStart, 3 Nodes, ListEnd = 5 events
921 assert_eq!(total, 5);
922 }
923
924 // ============ BATCH READING TESTS ============
925
926 #[tokio::test]
927 async fn test_next_batch_basic() {
928 let input = r"
929%VERSION: 1.0
930%STRUCT: User: [id, name]
931---
932users:@User
933 | alice, Alice
934 | bob, Bob
935 | carol, Carol
936";
937 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
938
939 // Read all events in one batch
940 let batch = parser.next_batch(10).await.unwrap();
941 assert_eq!(batch.len(), 5); // ListStart, 3 Nodes, ListEnd
942
943 // Next batch should be empty (EOF)
944 let batch = parser.next_batch(10).await.unwrap();
945 assert!(batch.is_empty());
946 }
947
948 #[tokio::test]
949 async fn test_next_batch_incremental() {
950 let input = r"
951%VERSION: 1.0
952%STRUCT: User: [id, name]
953---
954users:@User
955 | alice, Alice
956 | bob, Bob
957 | carol, Carol
958";
959 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
960
961 // Read in small batches
962 let batch1 = parser.next_batch(2).await.unwrap();
963 assert_eq!(batch1.len(), 2); // ListStart, Node
964
965 let batch2 = parser.next_batch(2).await.unwrap();
966 assert_eq!(batch2.len(), 2); // Node, Node
967
968 let batch3 = parser.next_batch(2).await.unwrap();
969 assert_eq!(batch3.len(), 1); // ListEnd
970
971 let batch4 = parser.next_batch(2).await.unwrap();
972 assert!(batch4.is_empty());
973 }
974
975 #[tokio::test]
976 async fn test_next_batch_empty_file() {
977 let input = r"
978%VERSION: 1.0
979%STRUCT: User: [id, name]
980---
981";
982 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
983
984 let batch = parser.next_batch(10).await.unwrap();
985 assert!(batch.is_empty());
986 }
987
988 #[tokio::test]
989 async fn test_next_batch_large() {
990 let mut input = String::from(
991 r"
992%VERSION: 1.0
993%STRUCT: Data: [id, value]
994---
995data:@Data
996",
997 );
998 for i in 0..500 {
999 input.push_str(&format!(" | row{i}, value{i}\n"));
1000 }
1001
1002 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1003
1004 // Read in large batches
1005 let batch1 = parser.next_batch(100).await.unwrap();
1006 assert_eq!(batch1.len(), 100); // ListStart + 99 Nodes
1007
1008 let batch2 = parser.next_batch(100).await.unwrap();
1009 assert_eq!(batch2.len(), 100); // 100 Nodes
1010
1011 // Continue until we get all events
1012 let mut total = batch1.len() + batch2.len();
1013 loop {
1014 let batch = parser.next_batch(100).await.unwrap();
1015 if batch.is_empty() {
1016 break;
1017 }
1018 total += batch.len();
1019 }
1020
1021 // Total: ListStart + 500 Nodes + ListEnd = 502
1022 assert_eq!(total, 502);
1023 }
1024
1025 // ============ CANCELLATION TESTS ============
1026
1027 #[tokio::test]
1028 async fn test_cancellation_basic() {
1029 use tokio::sync::watch;
1030
1031 let input = r"
1032%VERSION: 1.0
1033%STRUCT: User: [id, name]
1034---
1035users:@User
1036 | alice, Alice
1037 | bob, Bob
1038";
1039 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1040
1041 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1042
1043 // Read first event normally
1044 let event1 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1045 assert!(event1.is_some());
1046
1047 // Cancel
1048 cancel_tx.send(true).unwrap();
1049
1050 // Next read should return None (cancelled)
1051 let event2 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1052 assert!(event2.is_none());
1053 }
1054
1055 #[tokio::test]
1056 async fn test_cancellation_not_cancelled() {
1057 use tokio::sync::watch;
1058
1059 let input = r"
1060%VERSION: 1.0
1061%STRUCT: User: [id, name]
1062---
1063users:@User
1064 | alice, Alice
1065";
1066 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1067
1068 let (_cancel_tx, mut cancel_rx) = watch::channel(false);
1069
1070 // Read all events without cancellation
1071 let mut count = 0;
1072 while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1073 count += 1;
1074 }
1075
1076 // Should have read all events: ListStart, Node, ListEnd, EndOfDocument
1077 assert_eq!(count, 4);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_cancellation_during_processing() {
1082 use tokio::sync::watch;
1083
1084 let mut input = String::from(
1085 r"
1086%VERSION: 1.0
1087%STRUCT: Data: [id]
1088---
1089data:@Data
1090",
1091 );
1092 for i in 0..1000 {
1093 input.push_str(&format!(" | row{i}\n"));
1094 }
1095
1096 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1097
1098 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1099
1100 // Spawn a task that cancels after reading some events
1101 tokio::spawn(async move {
1102 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1103 cancel_tx.send(true).unwrap();
1104 });
1105
1106 let mut count = 0;
1107 while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1108 count += 1;
1109 // Small delay to allow cancellation to trigger
1110 tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
1111 }
1112
1113 // Should have read some events but not all 1002 (ListStart + 1000 Nodes + ListEnd)
1114 assert!(count < 1002);
1115 assert!(count > 0);
1116 }
1117
1118 // ============ CONCURRENT PROCESSING TESTS ============
1119
1120 #[tokio::test]
1121 async fn test_concurrent_file_processing() {
1122 let input = r"
1123%VERSION: 1.0
1124%STRUCT: User: [id, name]
1125---
1126users:@User
1127 | alice, Alice
1128 | bob, Bob
1129";
1130
1131 // Process multiple identical streams concurrently
1132 let tasks: Vec<_> = (0..5)
1133 .map(|_| {
1134 let input_clone = input.to_string();
1135 tokio::spawn(async move {
1136 let mut parser = AsyncStreamingParser::new(Cursor::new(input_clone))
1137 .await
1138 .unwrap();
1139
1140 let mut count = 0;
1141 while let Some(_event) = parser.next_event().await.unwrap() {
1142 count += 1;
1143 }
1144 count
1145 })
1146 })
1147 .collect();
1148
1149 let results = futures::future::join_all(tasks).await;
1150
1151 // All tasks should succeed and count the same number of events
1152 for result in results {
1153 assert_eq!(result.unwrap(), 5); // ListStart, 2 Nodes, ListEnd, EndOfDocument
1154 }
1155 }
1156
1157 #[tokio::test]
1158 async fn test_concurrent_with_stream_trait() {
1159 use futures::StreamExt;
1160
1161 let input = r"
1162%VERSION: 1.0
1163%STRUCT: Data: [id]
1164---
1165data:@Data
1166 | row1
1167 | row2
1168 | row3
1169";
1170
1171 // Process multiple streams concurrently using manual await
1172 // Note: futures::Stream combinators create !Send futures, so we can't use tokio::spawn
1173 let mut counts = Vec::new();
1174
1175 for _ in 0..10 {
1176 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1177
1178 // Count nodes using stream combinators
1179 let count = parser
1180 .filter_map(|result| async move {
1181 result.ok().and_then(|event| {
1182 if let NodeEvent::Node(_) = event {
1183 Some(())
1184 } else {
1185 None
1186 }
1187 })
1188 })
1189 .count()
1190 .await;
1191
1192 counts.push(count);
1193 }
1194
1195 // All should count 3 nodes
1196 for count in counts {
1197 assert_eq!(count, 3);
1198 }
1199 }
1200
1201 // ============ EDGE CASE AND INTEGRATION TESTS ============
1202
1203 #[tokio::test]
1204 async fn test_stream_trait_with_errors() {
1205 use futures::StreamExt;
1206
1207 let input = r"
1208%VERSION: 1.0
1209%STRUCT: User: [id, name]
1210---
1211users:@User
1212 | alice, Alice
1213 | bob
1214 | carol, Carol
1215";
1216 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1217
1218 let results: Vec<_> = parser.collect().await;
1219
1220 // Should have error for malformed row (bob with only 1 field)
1221 let errors: Vec<_> = results.iter().filter(|r| r.is_err()).collect();
1222 assert!(!errors.is_empty());
1223 }
1224
1225 #[tokio::test]
1226 async fn test_batch_with_mixed_events() {
1227 let input = r"
1228%VERSION: 1.0
1229%STRUCT: User: [id, name]
1230%STRUCT: Product: [id, title]
1231---
1232users:@User
1233 | alice, Alice
1234products:@Product
1235 | prod1, Widget
1236";
1237 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1238
1239 let batch = parser.next_batch(10).await.unwrap();
1240
1241 // Should contain: ListStart(User), Node(alice), ListEnd(User), ListStart(Product), Node(prod1), ListEnd(Product)
1242 assert_eq!(batch.len(), 6);
1243
1244 let list_starts: Vec<_> = batch
1245 .iter()
1246 .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
1247 .collect();
1248 assert_eq!(list_starts.len(), 2);
1249 }
1250
1251 #[tokio::test]
1252 async fn test_stream_empty_after_cancellation() {
1253 use tokio::sync::watch;
1254
1255 let input = r"
1256%VERSION: 1.0
1257%STRUCT: User: [id, name]
1258---
1259users:@User
1260 | alice, Alice
1261 | bob, Bob
1262";
1263 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1264
1265 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1266
1267 // Read one event
1268 let _event = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1269
1270 // Cancel
1271 cancel_tx.send(true).unwrap();
1272
1273 // Subsequent reads should return None
1274 assert!(parser
1275 .next_event_cancellable(&mut cancel_rx)
1276 .await
1277 .unwrap()
1278 .is_none());
1279 assert!(parser
1280 .next_event_cancellable(&mut cancel_rx)
1281 .await
1282 .unwrap()
1283 .is_none());
1284 }
1285
1286 #[tokio::test]
1287 async fn test_batch_reading_performance() {
1288 // Create a large dataset
1289 let mut input = String::from(
1290 r"
1291%VERSION: 1.0
1292%STRUCT: Data: [id, value]
1293---
1294data:@Data
1295",
1296 );
1297 for i in 0..1000 {
1298 input.push_str(&format!(" | row{i}, value{i}\n"));
1299 }
1300
1301 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1302
1303 let start = std::time::Instant::now();
1304
1305 // Read in batches
1306 let mut total = 0;
1307 loop {
1308 let batch = parser.next_batch(100).await.unwrap();
1309 if batch.is_empty() {
1310 break;
1311 }
1312 total += batch.len();
1313 }
1314
1315 let elapsed = start.elapsed();
1316
1317 // Should have read all events
1318 assert_eq!(total, 1002); // ListStart + 1000 Nodes + ListEnd
1319
1320 // Should complete reasonably quickly (< 100ms for 1000 rows)
1321 assert!(elapsed.as_millis() < 100);
1322 }
1323}