use crate::{
parser::{Event, EventType},
Error, Limits, Position, ResourceTracker, Result,
};
use std::collections::VecDeque;
use std::io::{BufRead, BufReader};
use std::path::Path;
#[derive(Debug, Clone)]
pub struct StreamConfig {
pub buffer_size: usize,
pub max_event_buffer: usize,
pub incremental: bool,
pub limits: Limits,
pub chunk_size: usize,
}
impl Default for StreamConfig {
fn default() -> Self {
Self {
buffer_size: 64 * 1024, max_event_buffer: 1000,
incremental: true,
limits: Limits::default(),
chunk_size: 8 * 1024, }
}
}
impl StreamConfig {
pub fn large_file() -> Self {
Self {
buffer_size: 1024 * 1024, max_event_buffer: 10000,
incremental: true,
limits: Limits::permissive(),
chunk_size: 64 * 1024, }
}
pub fn low_memory() -> Self {
Self {
buffer_size: 8 * 1024, max_event_buffer: 100,
incremental: true,
limits: Limits::strict(),
chunk_size: 1024, }
}
}
#[derive(Debug, Clone, PartialEq)]
enum StreamState {
Initial,
InDocument,
BetweenDocuments,
EndOfStream,
Error(String),
}
pub struct StreamingYamlParser<R: BufRead> {
reader: R,
config: StreamConfig,
state: StreamState,
buffer: String,
events: VecDeque<Event>,
position: Position,
resource_tracker: ResourceTracker,
context: ParseContext,
stats: StreamStats,
}
#[derive(Debug, Clone)]
struct ParseContext {
collection_stack: Vec<bool>,
indent_level: usize,
pending_anchor: Option<String>,
pending_tag: Option<String>,
in_block_scalar: bool,
block_scalar_indent: Option<usize>,
}
impl ParseContext {
fn new() -> Self {
Self {
collection_stack: Vec::new(),
indent_level: 0,
pending_anchor: None,
pending_tag: None,
in_block_scalar: false,
block_scalar_indent: None,
}
}
fn reset(&mut self) {
self.collection_stack.clear();
self.indent_level = 0;
self.pending_anchor = None;
self.pending_tag = None;
self.in_block_scalar = false;
self.block_scalar_indent = None;
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamStats {
pub bytes_read: usize,
pub events_generated: usize,
pub documents_parsed: usize,
pub errors_encountered: usize,
pub max_buffer_size: usize,
pub parse_time_ms: u64,
}
impl<R: BufRead> StreamingYamlParser<R> {
pub fn new(reader: R, config: StreamConfig) -> Self {
Self {
reader,
config,
state: StreamState::Initial,
buffer: String::with_capacity(4096),
events: VecDeque::with_capacity(100),
position: Position::new(),
resource_tracker: ResourceTracker::new(),
context: ParseContext::new(),
stats: StreamStats::default(),
}
}
pub fn parse_next(&mut self) -> Result<bool> {
let start = std::time::Instant::now();
let bytes_read = self.read_chunk()?;
if bytes_read == 0 && self.buffer.is_empty() {
self.state = StreamState::EndOfStream;
return Ok(false);
}
self.stats.bytes_read += bytes_read;
self.parse_buffer()?;
self.stats.parse_time_ms += start.elapsed().as_millis() as u64;
self.stats.max_buffer_size = self.stats.max_buffer_size.max(self.buffer.len());
Ok(!self.events.is_empty())
}
fn read_chunk(&mut self) -> Result<usize> {
let mut temp_buffer = vec![0u8; self.config.chunk_size];
let bytes_read = self.reader.read(&mut temp_buffer)?;
if bytes_read > 0 {
let chunk = String::from_utf8_lossy(&temp_buffer[..bytes_read]);
self.buffer.push_str(&chunk);
}
Ok(bytes_read)
}
fn parse_buffer(&mut self) -> Result<()> {
match self.state {
StreamState::Initial => {
self.emit_stream_start()?;
self.state = StreamState::BetweenDocuments;
}
StreamState::BetweenDocuments => {
self.parse_document_start()?;
}
StreamState::InDocument => {
self.parse_document_content()?;
}
StreamState::EndOfStream => {
return Ok(());
}
StreamState::Error(ref msg) => {
return Err(Error::parse(self.position, msg.clone()));
}
}
Ok(())
}
fn parse_document_start(&mut self) -> Result<()> {
self.skip_whitespace();
if self.buffer.starts_with("---") {
self.buffer.drain(..3);
self.position.column += 3;
self.emit_document_start()?;
self.state = StreamState::InDocument;
} else if !self.buffer.is_empty() {
self.emit_document_start()?;
self.state = StreamState::InDocument;
self.parse_document_content()?;
}
Ok(())
}
fn parse_document_content(&mut self) -> Result<()> {
while !self.buffer.is_empty() {
if self.buffer.starts_with("...") {
self.buffer.drain(..3);
self.position.column += 3;
self.emit_document_end()?;
self.state = StreamState::BetweenDocuments;
self.context.reset();
break;
}
if self.context.in_block_scalar {
self.parse_block_scalar_content()?;
} else {
self.parse_yaml_content()?;
}
if self.needs_more_data() {
break;
}
}
Ok(())
}
fn parse_yaml_content(&mut self) -> Result<()> {
self.skip_whitespace();
if self.buffer.is_empty() {
return Ok(());
}
let first_char = self.buffer.chars().next().unwrap();
match first_char {
'-' if self.is_sequence_item() => {
self.parse_sequence_item()?;
}
'[' => {
self.parse_flow_sequence()?;
}
'{' => {
self.parse_flow_mapping()?;
}
'|' | '>' => {
self.parse_block_scalar_start(first_char)?;
}
'&' => {
self.parse_anchor()?;
}
'*' => {
self.parse_alias()?;
}
'"' | '\'' => {
self.parse_quoted_scalar(first_char)?;
}
'#' => {
self.skip_comment();
}
'\n' => {
self.buffer.remove(0);
self.position.line += 1;
self.position.column = 0;
}
_ if self.is_mapping_key() => {
self.parse_mapping_entry()?;
}
_ => {
self.parse_plain_scalar()?;
}
}
Ok(())
}
fn needs_more_data(&self) -> bool {
if self.buffer.len() < 100 && !self.buffer.contains('\n') {
return true;
}
if self.context.in_block_scalar && !self.has_complete_block_scalar() {
return true;
}
false
}
fn has_complete_block_scalar(&self) -> bool {
self.buffer.contains("\n\n") || self.buffer.contains("\n...")
}
fn parse_sequence_item(&mut self) -> Result<()> {
self.buffer.remove(0); self.position.column += 1;
if !self.context.collection_stack.iter().any(|&x| !x) {
self.emit_sequence_start()?;
self.context.collection_stack.push(false);
}
self.skip_whitespace();
Ok(())
}
fn parse_mapping_entry(&mut self) -> Result<()> {
let key_end = self.find_mapping_key_end();
if let Some(end) = key_end {
let key = self.buffer.drain(..end).collect::<String>();
self.position.column += key.len();
if self.buffer.starts_with(':') {
self.buffer.remove(0);
self.position.column += 1;
}
if !self.context.collection_stack.iter().any(|&x| x) {
self.emit_mapping_start()?;
self.context.collection_stack.push(true);
}
self.emit_scalar(key.trim().to_string())?;
}
Ok(())
}
fn skip_whitespace(&mut self) {
while let Some(ch) = self.buffer.chars().next() {
if ch == ' ' || ch == '\t' {
self.buffer.remove(0);
self.position.column += 1;
} else {
break;
}
}
}
fn skip_comment(&mut self) {
if let Some(newline_pos) = self.buffer.find('\n') {
self.buffer.drain(..newline_pos);
self.position.column = 0;
} else {
self.buffer.clear();
}
}
fn is_sequence_item(&self) -> bool {
self.buffer.starts_with("- ")
}
fn is_mapping_key(&self) -> bool {
self.buffer.contains(':') && !self.buffer.starts_with(':')
}
fn find_mapping_key_end(&self) -> Option<usize> {
self.buffer.find(':')
}
fn parse_flow_sequence(&mut self) -> Result<()> {
if let Some(end) = self.buffer.find(']') {
let content = self.buffer.drain(..=end).collect::<String>();
self.emit_sequence_start()?;
self.emit_sequence_end()?;
self.position.column += content.len();
}
Ok(())
}
fn parse_flow_mapping(&mut self) -> Result<()> {
if let Some(end) = self.buffer.find('}') {
let content = self.buffer.drain(..=end).collect::<String>();
self.emit_mapping_start()?;
self.emit_mapping_end()?;
self.position.column += content.len();
}
Ok(())
}
fn parse_block_scalar_start(&mut self, _indicator: char) -> Result<()> {
self.buffer.remove(0); self.context.in_block_scalar = true;
Ok(())
}
fn parse_block_scalar_content(&mut self) -> Result<()> {
if let Some(end) = self.find_block_scalar_end() {
let content = self.buffer.drain(..end).collect::<String>();
self.emit_scalar(content)?;
self.context.in_block_scalar = false;
}
Ok(())
}
fn find_block_scalar_end(&self) -> Option<usize> {
self.buffer.find("\n\n").or(self.buffer.find("\n..."))
}
fn parse_anchor(&mut self) -> Result<()> {
self.buffer.remove(0); let end = self.find_identifier_end();
if let Some(end) = end {
let anchor = self.buffer.drain(..end).collect::<String>();
self.context.pending_anchor = Some(anchor);
self.position.column += end + 1;
}
Ok(())
}
fn parse_alias(&mut self) -> Result<()> {
self.buffer.remove(0); let end = self.find_identifier_end();
if let Some(end) = end {
let alias = self.buffer.drain(..end).collect::<String>();
self.emit_alias(alias)?;
self.position.column += end + 1;
}
Ok(())
}
fn parse_quoted_scalar(&mut self, quote: char) -> Result<()> {
self.buffer.remove(0); if let Some(end) = self.buffer.find(quote) {
let content = self.buffer.drain(..end).collect::<String>();
self.buffer.remove(0); let content_len = content.len();
self.emit_scalar(content)?;
self.position.column += content_len + 2;
}
Ok(())
}
fn parse_plain_scalar(&mut self) -> Result<()> {
let end = self.find_plain_scalar_end();
if let Some(end) = end {
let content = self.buffer.drain(..end).collect::<String>();
self.emit_scalar(content.trim().to_string())?;
self.position.column += end;
}
Ok(())
}
fn find_identifier_end(&self) -> Option<usize> {
for (i, ch) in self.buffer.char_indices() {
if !ch.is_alphanumeric() && ch != '_' && ch != '-' {
return Some(i);
}
}
None
}
fn find_plain_scalar_end(&self) -> Option<usize> {
for (i, ch) in self.buffer.char_indices() {
if ch == '\n' || ch == ':' || ch == '#' {
return Some(i);
}
}
Some(self.buffer.len())
}
fn emit_stream_start(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::StreamStart,
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_stream_end(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::StreamEnd,
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_document_start(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::DocumentStart {
version: None,
tags: Vec::new(),
implicit: false,
},
position: self.position,
});
self.stats.events_generated += 1;
self.stats.documents_parsed += 1;
Ok(())
}
fn emit_document_end(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::DocumentEnd { implicit: false },
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_sequence_start(&mut self) -> Result<()> {
let anchor = self.context.pending_anchor.take();
let tag = self.context.pending_tag.take();
self.events.push_back(Event {
event_type: EventType::SequenceStart {
anchor,
tag,
flow_style: false,
},
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_sequence_end(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::SequenceEnd,
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_mapping_start(&mut self) -> Result<()> {
let anchor = self.context.pending_anchor.take();
let tag = self.context.pending_tag.take();
self.events.push_back(Event {
event_type: EventType::MappingStart {
anchor,
tag,
flow_style: false,
},
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_mapping_end(&mut self) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::MappingEnd,
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_scalar(&mut self, value: String) -> Result<()> {
let anchor = self.context.pending_anchor.take();
let tag = self.context.pending_tag.take();
self.events.push_back(Event {
event_type: EventType::Scalar {
value,
anchor,
tag,
style: crate::parser::ScalarStyle::Plain,
plain_implicit: true,
quoted_implicit: true,
},
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
fn emit_alias(&mut self, anchor: String) -> Result<()> {
self.events.push_back(Event {
event_type: EventType::Alias { anchor },
position: self.position,
});
self.stats.events_generated += 1;
Ok(())
}
pub fn next_event(&mut self) -> Option<Event> {
self.events.pop_front()
}
pub fn has_events(&self) -> bool {
!self.events.is_empty()
}
pub fn stats(&self) -> &StreamStats {
&self.stats
}
pub fn buffer_size(&self) -> usize {
self.buffer.len()
}
}
impl<R: BufRead> Iterator for StreamingYamlParser<R> {
type Item = Result<Event>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(event) = self.next_event() {
return Some(Ok(event));
}
match self.parse_next() {
Ok(true) => self.next_event().map(Ok),
Ok(false) if self.state == StreamState::EndOfStream => {
if !self.events.is_empty() {
self.next_event().map(Ok)
} else {
None
}
}
Ok(false) => None,
Err(e) => Some(Err(e)),
}
}
}
pub fn stream_from_file<P: AsRef<Path>>(
path: P,
config: StreamConfig,
) -> Result<StreamingYamlParser<BufReader<std::fs::File>>> {
let file = std::fs::File::open(path)?;
let reader = BufReader::with_capacity(config.buffer_size, file);
Ok(StreamingYamlParser::new(reader, config))
}
pub fn stream_from_string(
input: String,
config: StreamConfig,
) -> StreamingYamlParser<BufReader<std::io::Cursor<String>>> {
let cursor = std::io::Cursor::new(input);
let reader = BufReader::new(cursor);
StreamingYamlParser::new(reader, config)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_basic_streaming() {
let yaml = "---\nkey: value\n...\n---\nother: data\n...";
let cursor = Cursor::new(yaml.to_string());
let reader = BufReader::new(cursor);
let mut parser = StreamingYamlParser::new(reader, StreamConfig::default());
let mut events = Vec::new();
while let Some(event) = parser.next() {
events.push(event.unwrap());
}
assert!(events.len() > 0);
assert!(matches!(events[0].event_type, EventType::StreamStart));
}
#[test]
fn test_incremental_parsing() {
let yaml = "key: value\nlist:\n - item1\n - item2";
let mut parser = stream_from_string(yaml.to_string(), StreamConfig::default());
let mut event_count = 0;
while parser.parse_next().unwrap() {
while let Some(_event) = parser.next_event() {
event_count += 1;
}
}
assert!(event_count > 0);
}
#[test]
fn test_large_buffer_handling() {
let mut yaml = String::new();
for i in 0..1000 {
yaml.push_str(&format!("item{}: value{}\n", i, i));
}
let config = StreamConfig::large_file();
let mut parser = stream_from_string(yaml, config);
let mut events = Vec::new();
for event in parser.take(100) {
events.push(event.unwrap());
}
assert!(events.len() > 0);
}
}