Skip to main content

AsyncStreamingParser

Struct AsyncStreamingParser 

Source
pub struct AsyncStreamingParser<R: AsyncRead + Unpin> { /* private fields */ }
Expand description

Async streaming HEDL parser.

Processes HEDL documents asynchronously, yielding NodeEvent items as they are parsed without loading the entire document into memory. Uses tokio’s async I/O for non-blocking operation.

§Memory Characteristics

  • Header: Parsed once at initialization and kept in memory
  • Per-Line: Only current line and parsing context (stack depth proportional to nesting)
  • No Buffering: Nodes are yielded immediately after parsing
  • Identical to Sync: Same memory profile as synchronous parser

§Examples

§Parse from Async File

use hedl_stream::{AsyncStreamingParser, NodeEvent};
use tokio::fs::File;

let file = File::open("data.hedl").await?;
let mut parser = AsyncStreamingParser::new(file).await?;

while let Some(event) = parser.next_event().await? {
    if let NodeEvent::Node(node) = event {
        println!("Processing {}: {}", node.type_name, node.id);
    }
}

§With Timeout Protection

use hedl_stream::{AsyncStreamingParser, StreamingParserConfig, StreamError};
use std::time::Duration;
use std::io::Cursor;

let config = StreamingParserConfig {
    timeout: Some(Duration::from_secs(10)),
    ..Default::default()
};

let mut parser = AsyncStreamingParser::with_config(
    Cursor::new("untrusted input"),
    config
).await?;

while let Some(event) = parser.next_event().await? {
    // Process event
}

Implementations§

Source§

impl<R: AsyncRead + Unpin> AsyncStreamingParser<R>

Source

pub async fn new(reader: R) -> StreamResult<Self>

Create a new async streaming parser with default configuration.

The parser immediately reads and validates the HEDL header (version and schema directives). If the header is invalid, this function returns an error.

§Parameters
  • reader: Any type implementing AsyncRead + Unpin
§Returns
  • Ok(parser): Parser ready to yield events
  • Err(e): Header parsing failed (missing version, invalid schema, etc.)
§Examples
§From a File
use hedl_stream::AsyncStreamingParser;
use tokio::fs::File;

let file = File::open("data.hedl").await?;
let parser = AsyncStreamingParser::new(file).await?;
§From a String
use hedl_stream::AsyncStreamingParser;
use std::io::Cursor;

let data = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
 | alice, Alice
"#;

let parser = AsyncStreamingParser::new(Cursor::new(data)).await?;
§Errors
  • StreamError::MissingVersion: No %VERSION directive found
  • StreamError::InvalidVersion: Invalid version format
  • StreamError::Syntax: Malformed header directive
  • StreamError::Io: I/O error reading input
Source

pub async fn with_config( reader: R, config: StreamingParserConfig, ) -> StreamResult<Self>

Create an async streaming parser with custom configuration.

Use this when you need to control memory limits, buffer sizes, or enable timeout protection for untrusted input.

§Examples
§With Timeout
use hedl_stream::{AsyncStreamingParser, StreamingParserConfig};
use std::time::Duration;
use std::io::Cursor;

let config = StreamingParserConfig {
    timeout: Some(Duration::from_secs(30)),
    ..Default::default()
};

let parser = AsyncStreamingParser::with_config(
    Cursor::new("untrusted input"),
    config
).await?;
Source

pub fn header(&self) -> Option<&HeaderInfo>

Get the parsed header information.

Returns header metadata including version, schema definitions, aliases, and nesting rules. This is available immediately after parser creation.

§Examples
use hedl_stream::AsyncStreamingParser;
use std::io::Cursor;

let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name, email]
---
"#;

let parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
let header = parser.header().unwrap();

assert_eq!(header.version, (1, 0));
let user_schema = header.get_schema("User").unwrap();
assert_eq!(user_schema, &vec!["id", "name", "email"]);
Source

pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>>

Parse the next event from the stream asynchronously.

Returns Ok(Some(event)) if an event was parsed, Ok(None) at end of document, or Err on parsing errors.

§Performance

This method is async and will yield to the tokio runtime when waiting for I/O, allowing other tasks to run. It does not block the thread.

§Examples
use hedl_stream::{AsyncStreamingParser, NodeEvent};
use std::io::Cursor;

let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
 | alice, Alice
"#;

let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;

while let Some(event) = parser.next_event().await? {
    match event {
        NodeEvent::Node(node) => println!("Node: {}", node.id),
        NodeEvent::ListStart { type_name, .. } => println!("List: {}", type_name),
        _ => {}
    }
}
Source

pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>>

Read up to n events in a single async operation.

Reduces await overhead for high-throughput scenarios by batching event reads. This can improve performance when processing many small events.

§Parameters
  • n: Maximum number of events to read
§Returns
  • Ok(Vec<NodeEvent>): Vector of events (may be fewer than n if EOF reached)
  • Err(e): Parsing error encountered
§Examples
use hedl_stream::AsyncStreamingParser;
use tokio::fs::File;

let file = File::open("data.hedl").await?;
let mut parser = AsyncStreamingParser::new(file).await?;

// Read events in batches of 100
loop {
    let batch = parser.next_batch(100).await?;
    if batch.is_empty() {
        break;
    }

    // Process batch
    for event in batch {
        // ...
    }
}
Source

pub async fn next_event_cancellable( &mut self, cancel_rx: &mut Receiver<bool>, ) -> StreamResult<Option<NodeEvent>>

Read events with cancellation support via tokio watch channel.

Returns Ok(None) if cancelled, otherwise behaves like next_event().

§Examples
use hedl_stream::AsyncStreamingParser;
use tokio::sync::watch;
use std::io::Cursor;

let input = r#"
%VERSION: 1.0
%STRUCT: User: [id, name]
---
users:@User
 | alice, Alice
"#;

let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await?;
let (cancel_tx, mut cancel_rx) = watch::channel(false);

// Can cancel from another task
tokio::spawn(async move {
    tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
    let _ = cancel_tx.send(true);
});

while let Some(event) = parser.next_event_cancellable(&mut cancel_rx).await? {
    // Process event
}

Trait Implementations§

Source§

impl<R: AsyncRead + Unpin> Stream for AsyncStreamingParser<R>

Available on crate feature async only.
Source§

type Item = Result<NodeEvent, StreamError>

Values yielded by the stream.
Source§

fn poll_next( self: Pin<&mut Self>, cx: &mut TaskContext<'_>, ) -> Poll<Option<Self::Item>>

Attempt to pull out the next value of this stream, registering the current task for wakeup if the value is not yet available, and returning None if the stream is exhausted. Read more
Source§

fn size_hint(&self) -> (usize, Option<usize>)

Returns the bounds on the remaining length of the stream. Read more

Auto Trait Implementations§

§

impl<R> Freeze for AsyncStreamingParser<R>
where R: Freeze,

§

impl<R> RefUnwindSafe for AsyncStreamingParser<R>
where R: RefUnwindSafe,

§

impl<R> Send for AsyncStreamingParser<R>
where R: Send,

§

impl<R> Sync for AsyncStreamingParser<R>
where R: Sync,

§

impl<R> Unpin for AsyncStreamingParser<R>

§

impl<R> UnwindSafe for AsyncStreamingParser<R>
where R: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<S, T, E> TryStream for S
where S: Stream<Item = Result<T, E>> + ?Sized,

Source§

type Ok = T

The type of successful values yielded by this future
Source§

type Error = E

The type of failures yielded by this future
Source§

fn try_poll_next( self: Pin<&mut S>, cx: &mut Context<'_>, ) -> Poll<Option<Result<<S as TryStream>::Ok, <S as TryStream>::Error>>>

Poll this TryStream as if it were a Stream. Read more