#[cfg(feature = "async")]
use futures::stream::Stream;
#[cfg(feature = "async")]
use std::pin::Pin;
#[cfg(feature = "async")]
use std::task::{Context, Poll};
#[cfg(feature = "async")]
use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
use crate::{
parser::{Event, EventType},
Limits, Position, Result,
};
use std::collections::VecDeque;
#[cfg(feature = "async")]
pub struct AsyncStreamingParser<R: AsyncBufRead + Unpin> {
reader: R,
buffer: String,
events: VecDeque<Event>,
position: Position,
state: AsyncParseState,
limits: Limits,
stats: AsyncStreamStats,
}
#[cfg(feature = "async")]
#[derive(Debug, Clone, PartialEq)]
enum AsyncParseState {
Initial,
InDocument,
BetweenDocuments,
Complete,
}
#[cfg(feature = "async")]
#[derive(Debug, Clone, Default)]
#[allow(missing_docs)]
pub struct AsyncStreamStats {
pub bytes_read: usize,
pub events_generated: usize,
pub documents_parsed: usize,
}
#[cfg(feature = "async")]
impl<R: AsyncBufRead + Unpin> AsyncStreamingParser<R> {
pub fn new(reader: R, limits: Limits) -> Self {
Self {
reader,
buffer: String::with_capacity(4096),
events: VecDeque::with_capacity(100),
position: Position::new(),
state: AsyncParseState::Initial,
limits,
stats: AsyncStreamStats::default(),
}
}
pub async fn parse_next(&mut self) -> Result<bool> {
let mut line = String::new();
let bytes_read = self.reader.read_line(&mut line).await?;
if bytes_read == 0 && self.buffer.is_empty() {
self.state = AsyncParseState::Complete;
return Ok(false);
}
self.buffer.push_str(&line);
self.stats.bytes_read += bytes_read;
self.parse_buffer()?;
Ok(!self.events.is_empty())
}
fn parse_buffer(&mut self) -> Result<()> {
match self.state {
AsyncParseState::Initial => {
self.emit_event(EventType::StreamStart)?;
self.state = AsyncParseState::BetweenDocuments;
}
AsyncParseState::BetweenDocuments => {
if self.buffer.contains("---") {
self.emit_event(EventType::DocumentStart {
version: None,
tags: Vec::new(),
implicit: true,
})?;
self.state = AsyncParseState::InDocument;
self.stats.documents_parsed += 1;
}
}
AsyncParseState::InDocument => {
self.parse_document_content()?;
}
AsyncParseState::Complete => {}
}
Ok(())
}
fn parse_document_content(&mut self) -> Result<()> {
while !self.buffer.is_empty() {
if self.buffer.starts_with("...") {
self.emit_event(EventType::DocumentEnd { implicit: false })?;
self.state = AsyncParseState::BetweenDocuments;
self.buffer.drain(..3);
break;
}
if let Some(newline_pos) = self.buffer.find('\n') {
let line = self.buffer.drain(..=newline_pos).collect::<String>();
self.parse_line(line)?;
} else {
break; }
}
Ok(())
}
fn parse_line(&mut self, line: String) -> Result<()> {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
return Ok(());
}
if let Some(colon_pos) = trimmed.find(':') {
let key = &trimmed[..colon_pos];
let value = &trimmed[colon_pos + 1..];
self.emit_event(EventType::Scalar {
value: key.trim().to_string(),
anchor: None,
tag: None,
style: crate::parser::ScalarStyle::Plain,
plain_implicit: true,
quoted_implicit: true,
})?;
self.emit_event(EventType::Scalar {
value: value.trim().to_string(),
anchor: None,
tag: None,
style: crate::parser::ScalarStyle::Plain,
plain_implicit: true,
quoted_implicit: true,
})?;
}
Ok(())
}
fn emit_event(&mut self, event_type: EventType) -> Result<()> {
self.events.push_back(Event {
event_type,
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
pub fn next_event(&mut self) -> Option<Event> {
self.events.pop_front()
}
pub fn is_complete(&self) -> bool {
self.state == AsyncParseState::Complete && self.events.is_empty()
}
pub fn stats(&self) -> &AsyncStreamStats {
&self.stats
}
}
#[cfg(feature = "async")]
impl<R: AsyncBufRead + Unpin> Stream for AsyncStreamingParser<R> {
type Item = Result<Event>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(event) = self.next_event() {
return Poll::Ready(Some(Ok(event)));
}
if self.is_complete() {
return Poll::Ready(None);
}
let waker = cx.waker().clone();
match futures::executor::block_on(self.parse_next()) {
Ok(true) => {
if let Some(event) = self.next_event() {
Poll::Ready(Some(Ok(event)))
} else {
waker.wake();
Poll::Pending
}
}
Ok(false) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
#[cfg(feature = "async")]
pub mod helpers {
use super::*;
use std::path::Path;
use tokio::fs::File;
pub async fn stream_from_file_async<P: AsRef<Path>>(
path: P,
limits: Limits,
) -> Result<AsyncStreamingParser<BufReader<File>>> {
let file = File::open(path).await?;
let reader = BufReader::new(file);
Ok(AsyncStreamingParser::new(reader, limits))
}
pub fn stream_from_async_reader<R: AsyncBufRead + Unpin>(
reader: R,
limits: Limits,
) -> AsyncStreamingParser<R> {
AsyncStreamingParser::new(reader, limits)
}
pub async fn process_yaml_stream<R, F, Fut>(
mut parser: AsyncStreamingParser<R>,
mut callback: F,
) -> Result<()>
where
R: AsyncBufRead + Unpin,
F: FnMut(Event) -> Fut,
Fut: std::future::Future<Output = Result<()>>,
{
while !parser.is_complete() {
if parser.parse_next().await? {
while let Some(event) = parser.next_event() {
callback(event).await?;
}
}
}
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
pub mod mmap {
use crate::Result;
use memmap2::{Mmap, MmapOptions};
use std::fs::File;
use std::path::Path;
pub struct MmapYamlReader {
mmap: Mmap,
position: usize,
}
impl MmapYamlReader {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
let file = File::open(path)?;
#[allow(unsafe_code)]
let mmap = unsafe { MmapOptions::new().map(&file)? };
Ok(Self { mmap, position: 0 })
}
pub fn as_str(&self) -> Result<&str> {
std::str::from_utf8(&self.mmap).map_err(|e| {
crate::Error::construction(
crate::Position::new(),
format!("UTF-8 conversion failed: {}", e),
)
})
}
pub fn read_chunk(&mut self, size: usize) -> Option<&str> {
if self.position >= self.mmap.len() {
return None;
}
let end = (self.position + size).min(self.mmap.len());
let chunk = &self.mmap[self.position..end];
self.position = end;
std::str::from_utf8(chunk).ok()
}
pub fn reset(&mut self) {
self.position = 0;
}
pub fn remaining(&self) -> usize {
self.mmap.len().saturating_sub(self.position)
}
}
}
#[cfg(all(test, feature = "async"))]
mod async_tests {
use super::*;
use futures::StreamExt;
use std::io::Cursor;
#[tokio::test]
async fn test_async_streaming() {
const MAX_ITERATIONS: usize = 100;
let yaml = "---\nkey: value\n...\n";
let cursor = Cursor::new(yaml.as_bytes().to_vec());
let reader = BufReader::new(cursor);
let mut parser = AsyncStreamingParser::new(reader, Limits::default());
let mut events = Vec::new();
let mut iterations = 0;
while !parser.is_complete() && iterations < MAX_ITERATIONS {
iterations += 1;
match parser.parse_next().await {
Ok(has_events) => {
if has_events {
while let Some(event) = parser.next_event() {
events.push(event);
}
} else if parser.state == AsyncParseState::Complete {
break;
}
}
Err(_) => break,
}
}
assert!(!events.is_empty());
assert!(matches!(events[0].event_type, EventType::StreamStart));
}
#[tokio::test]
async fn test_stream_trait() {
use tokio::time::{timeout, Duration};
let yaml = "key: value\n";
let cursor = Cursor::new(yaml.as_bytes().to_vec());
let reader = BufReader::new(cursor);
let mut parser = AsyncStreamingParser::new(reader, Limits::default());
let result = timeout(Duration::from_secs(5), parser.take(5).collect::<Vec<_>>()).await;
let events = result.expect("Test timed out after 5 seconds");
assert!(!events.is_empty());
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod mmap_tests {
use super::mmap::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_mmap_reader() {
let mut file = NamedTempFile::new().unwrap();
writeln!(file, "key: value").unwrap();
writeln!(file, "list:").unwrap();
writeln!(file, " - item1").unwrap();
writeln!(file, " - item2").unwrap();
file.flush().unwrap();
let mut reader = MmapYamlReader::new(file.path()).unwrap();
let content = reader.as_str().unwrap();
assert!(content.contains("key: value"));
reader.reset();
let chunk = reader.read_chunk(10).unwrap();
assert_eq!(chunk, "key: value");
}
}