pub struct AsyncStreamingParser<R: AsyncRead + Unpin> { /* private fields */ }Expand description
Async streaming HEDL parser.
Processes HEDL documents asynchronously, yielding NodeEvent items as they
are parsed without loading the entire document into memory. Uses tokio’s
async I/O for non-blocking operation.
§Memory Characteristics
- Header: Parsed once at initialization and kept in memory
- Per-Line: Only current line and parsing context (stack depth proportional to nesting)
- No Buffering: Nodes are yielded immediately after parsing
- Identical to Sync: Same memory profile as synchronous parser
§Examples
§Parse from Async File
use hedl_stream::{AsyncStreamingParser, NodeEvent};
use tokio::fs::File;
let file = File::open("data.hedl").await?;
let mut parser = AsyncStreamingParser::new(file).await?;
while let Some(event) = parser.next_event().await? {
if let NodeEvent::Node(node) = event {
println!("Processing {}: {}", node.type_name, node.id);
}
}§With Timeout Protection
use hedl_stream::{AsyncStreamingParser, StreamingParserConfig, StreamError};
use std::time::Duration;
use std::io::Cursor;
let config = StreamingParserConfig {
timeout: Some(Duration::from_secs(10)),
..Default::default()
};
let mut parser = AsyncStreamingParser::with_config(
Cursor::new("untrusted input"),
config
).await?;
while let Some(event) = parser.next_event().await? {
// Process event
}Implementations§
Source§impl<R: AsyncRead + Unpin> AsyncStreamingParser<R>
impl<R: AsyncRead + Unpin> AsyncStreamingParser<R>
Sourcepub async fn new(reader: R) -> StreamResult<Self>
pub async fn new(reader: R) -> StreamResult<Self>
Create a new async streaming parser with default configuration.
The parser immediately reads and validates the HEDL header (version and schema directives). If the header is invalid, this function returns an error.
§Parameters
reader: Any type implementingAsyncRead + Unpin
§Returns
Ok(parser): Parser ready to yield eventsErr(e): Header parsing failed (missing version, invalid schema, etc.)
§Examples
§From a File
use hedl_stream::AsyncStreamingParser;
use tokio::fs::File;
let file = File::open("data.hedl").await?;
let parser = AsyncStreamingParser::new(file).await?;§From a String
use hedl_stream::AsyncStreamingParser;
use std::io::Cursor;
let data = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
| alice, Alice
"#;
let parser = AsyncStreamingParser::new(Cursor::new(data)).await?;§Errors
StreamError::MissingVersion: No%VERSIONdirective foundStreamError::InvalidVersion: Invalid version formatStreamError::Syntax: Malformed header directiveStreamError::Io: I/O error reading input
Sourcepub async fn with_config(
reader: R,
config: StreamingParserConfig,
) -> StreamResult<Self>
pub async fn with_config( reader: R, config: StreamingParserConfig, ) -> StreamResult<Self>
Create an async streaming parser with custom configuration.
Use this when you need to control memory limits, buffer sizes, or enable timeout protection for untrusted input.
§Examples
§With Timeout
use hedl_stream::{AsyncStreamingParser, StreamingParserConfig};
use std::time::Duration;
use std::io::Cursor;
let config = StreamingParserConfig {
timeout: Some(Duration::from_secs(30)),
..Default::default()
};
let parser = AsyncStreamingParser::with_config(
Cursor::new("untrusted input"),
config
).await?;Sourcepub fn header(&self) -> Option<&HeaderInfo>
pub fn header(&self) -> Option<&HeaderInfo>
Get the parsed header information.
Returns header metadata including version, schema definitions, aliases, and nesting rules. This is available immediately after parser creation.
§Examples
use hedl_stream::AsyncStreamingParser;
use std::io::Cursor;
let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name, email]
---
"#;
let parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
let header = parser.header().unwrap();
assert_eq!(header.version, (1, 0));
let user_schema = header.get_schema("User").unwrap();
assert_eq!(user_schema, &vec!["id", "name", "email"]);Sourcepub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>>
pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>>
Parse the next event from the stream asynchronously.
Returns Ok(Some(event)) if an event was parsed, Ok(None) at end of document,
or Err on parsing errors.
§Performance
This method is async and will yield to the tokio runtime when waiting for I/O, allowing other tasks to run. It does not block the thread.
§Examples
use hedl_stream::{AsyncStreamingParser, NodeEvent};
use std::io::Cursor;
let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
| alice, Alice
"#;
let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
while let Some(event) = parser.next_event().await? {
match event {
NodeEvent::Node(node) => println!("Node: {}", node.id),
NodeEvent::ListStart { type_name, .. } => println!("List: {}", type_name),
_ => {}
}
}Sourcepub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>>
pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>>
Read up to n events in a single async operation.
Reduces await overhead for high-throughput scenarios by batching event reads. This can improve performance when processing many small events.
§Parameters
n: Maximum number of events to read
§Returns
Ok(Vec<NodeEvent>): Vector of events (may be fewer thannif EOF reached)Err(e): Parsing error encountered
§Examples
use hedl_stream::AsyncStreamingParser;
use tokio::fs::File;
let file = File::open("data.hedl").await?;
let mut parser = AsyncStreamingParser::new(file).await?;
// Read events in batches of 100
loop {
let batch = parser.next_batch(100).await?;
if batch.is_empty() {
break;
}
// Process batch
for event in batch {
// ...
}
}Sourcepub async fn next_event_cancellable(
&mut self,
cancel_rx: &mut Receiver<bool>,
) -> StreamResult<Option<NodeEvent>>
pub async fn next_event_cancellable( &mut self, cancel_rx: &mut Receiver<bool>, ) -> StreamResult<Option<NodeEvent>>
Read events with cancellation support via tokio watch channel.
Returns Ok(None) if cancelled, otherwise behaves like next_event().
§Examples
use hedl_stream::AsyncStreamingParser;
use tokio::sync::watch;
use std::io::Cursor;
let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
| alice, Alice
"#;
let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);
// Can cancel from another task
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
let _ = cancel_tx.send(true);
});
while let Some(event) = parser.next_event_cancellable(&mut cancel_rx).await? {
// Process event
}Trait Implementations§
Source§impl<R: AsyncRead + Unpin> Stream for AsyncStreamingParser<R>
Available on crate feature async only.
impl<R: AsyncRead + Unpin> Stream for AsyncStreamingParser<R>
async only.