use crate::{
parser::{Event, Parser, ParserState},
zerocopy::ScannerStats,
BasicScanner, Error, Position, Result, Scanner, Token, TokenType, ZeroScanner, ZeroToken,
ZeroTokenType,
};
use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub max_buffer_size: usize,
pub use_zero_copy: bool,
pub max_depth: usize,
pub collect_stats: bool,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
max_buffer_size: 64,
use_zero_copy: true,
max_depth: 256,
collect_stats: false,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamingStats {
pub events_processed: usize,
pub tokens_processed: usize,
pub max_buffer_size: usize,
pub max_depth: usize,
pub scanner_stats: Option<ScannerStats>,
pub parse_time_ns: u64,
}
pub struct StreamingParser<'a> {
scanner: Option<BasicScanner>,
zero_scanner: Option<ZeroScanner<'a>>,
config: StreamingConfig,
event_buffer: VecDeque<Event>,
state: ParserState,
state_stack: Vec<ParserState>,
position: Position,
depth: usize,
pending_anchor: Option<String>,
pending_tag: Option<String>,
stats: Option<StreamingStats>,
start_time: std::time::Instant,
stream_ended: bool,
}
impl<'a> StreamingParser<'a> {
pub fn new(input: String, config: StreamingConfig) -> StreamingParser<'static> {
let scanner = BasicScanner::new(input);
let position = scanner.position();
StreamingParser {
scanner: Some(scanner),
zero_scanner: None,
config: config.clone(),
event_buffer: VecDeque::with_capacity(config.max_buffer_size),
state: ParserState::StreamStart,
state_stack: Vec::with_capacity(config.max_depth),
position,
depth: 0,
pending_anchor: None,
pending_tag: None,
stats: if config.collect_stats {
Some(StreamingStats {
events_processed: 0,
tokens_processed: 0,
max_buffer_size: 0,
max_depth: 0,
scanner_stats: None,
parse_time_ns: 0,
})
} else {
None
},
start_time: std::time::Instant::now(),
stream_ended: false,
}
}
pub fn new_zero_copy(input: &'a str, config: StreamingConfig) -> Self {
let zero_scanner = ZeroScanner::new(input);
let position = zero_scanner.position;
Self {
scanner: None,
zero_scanner: Some(zero_scanner),
config: config.clone(),
event_buffer: VecDeque::with_capacity(config.max_buffer_size),
state: ParserState::StreamStart,
state_stack: Vec::with_capacity(config.max_depth),
position,
depth: 0,
pending_anchor: None,
pending_tag: None,
stats: if config.collect_stats {
Some(StreamingStats {
events_processed: 0,
tokens_processed: 0,
max_buffer_size: 0,
max_depth: 0,
scanner_stats: None,
parse_time_ns: 0,
})
} else {
None
},
start_time: std::time::Instant::now(),
stream_ended: false,
}
}
pub fn next_batch(&mut self) -> Result<Vec<Event>> {
if self.stream_ended {
return Ok(Vec::new());
}
let mut events = Vec::new();
let target_size = std::cmp::min(self.config.max_buffer_size / 2, 8);
while events.len() < target_size && !self.stream_ended {
if let Some(event) = self.next_event_internal()? {
events.push(event);
} else {
break;
}
}
Ok(events)
}
fn next_event_internal(&mut self) -> Result<Option<Event>> {
if let Some(event) = self.event_buffer.pop_front() {
self.update_stats_for_event(&event);
return Ok(Some(event));
}
self.generate_events()?;
if let Some(event) = self.event_buffer.pop_front() {
self.update_stats_for_event(&event);
Ok(Some(event))
} else {
Ok(None)
}
}
fn generate_events(&mut self) -> Result<()> {
if self.stream_ended {
return Ok(());
}
if self.depth > self.config.max_depth {
return Err(Error::parse(
self.position,
format!("Maximum nesting depth exceeded: {}", self.config.max_depth),
));
}
if self.config.use_zero_copy && self.zero_scanner.is_some() {
self.generate_events_zero_copy()
} else {
self.generate_events_traditional()
}
}
fn generate_events_zero_copy(&mut self) -> Result<()> {
let batch_size = 16;
let mut processed = 0;
while processed < batch_size {
let current_char = if let Some(scanner) = &self.zero_scanner {
scanner.current_char()
} else {
None
};
if current_char.is_none() {
if !matches!(self.state, ParserState::StreamEnd) {
self.event_buffer
.push_back(Event::stream_end(self.position));
self.stream_ended = true;
}
break;
}
if let Some(scanner) = &mut self.zero_scanner {
scanner.skip_whitespace();
}
let ch = current_char.unwrap();
match ch {
'-' if self.is_document_start_candidate_simple() => {
self.handle_document_start();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
scanner.advance();
scanner.advance();
}
}
'.' if self.is_document_end_candidate_simple() => {
self.handle_document_end();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
scanner.advance();
scanner.advance();
}
}
'[' => {
self.handle_flow_sequence_start();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
']' => {
self.handle_flow_sequence_end();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
'{' => {
self.handle_flow_mapping_start();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
'}' => {
self.handle_flow_mapping_end();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
':' if self.is_value_indicator_simple() => {
self.handle_value_indicator();
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
',' => {
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
'#' => {
self.skip_comment_simple();
}
ch if ch.is_alphabetic() || ch.is_numeric() => {
let scalar_token = if let Some(scanner) = &mut self.zero_scanner {
scanner.scan_plain_scalar_zero_copy()?
} else {
return Err(Error::parse(
self.position,
"No scanner available".to_string(),
));
};
self.handle_zero_copy_scalar(scalar_token)?;
}
'&' => {
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance(); let anchor = scanner.scan_identifier_zero_copy()?;
self.pending_anchor = Some(anchor.as_str().to_string());
}
}
'*' => {
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance(); let alias = scanner.scan_identifier_zero_copy()?;
self.event_buffer
.push_back(Event::alias(self.position, alias.as_str().to_string()));
}
}
_ => {
if let Some(scanner) = &mut self.zero_scanner {
scanner.advance();
}
}
}
processed += 1;
if let Some(scanner) = &self.zero_scanner {
self.position = scanner.position;
}
if let Some(ref mut stats) = self.stats {
stats.tokens_processed += 1;
}
}
Ok(())
}
fn generate_events_traditional(&mut self) -> Result<()> {
for _ in 0..4 {
let has_token = if let Some(scanner) = &self.scanner {
scanner.check_token()
} else {
false
};
if !has_token {
if !matches!(self.state, ParserState::StreamEnd) {
self.event_buffer
.push_back(Event::stream_end(self.position));
self.stream_ended = true;
}
break;
}
let token = if let Some(scanner) = &mut self.scanner {
scanner.get_token()?
} else {
None
};
if let Some(token) = token {
self.process_token(token)?;
if let Some(ref mut stats) = self.stats {
stats.tokens_processed += 1;
}
}
}
Ok(())
}
fn is_document_start_candidate_simple(&self) -> bool {
if let Some(scanner) = &self.zero_scanner {
scanner.current_char() == Some('-')
&& scanner.peek_char(1) == Some('-')
&& scanner.peek_char(2) == Some('-')
&& scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
} else {
false
}
}
fn is_document_end_candidate_simple(&self) -> bool {
if let Some(scanner) = &self.zero_scanner {
scanner.current_char() == Some('.')
&& scanner.peek_char(1) == Some('.')
&& scanner.peek_char(2) == Some('.')
&& scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
} else {
false
}
}
fn is_value_indicator_simple(&self) -> bool {
if let Some(scanner) = &self.zero_scanner {
scanner.current_char() == Some(':')
&& scanner.peek_char(1).map_or(true, |c| c.is_whitespace())
} else {
false
}
}
fn handle_document_start(&mut self) {
self.event_buffer
.push_back(Event::document_start(self.position, None, vec![], false));
self.state = ParserState::DocumentStart;
}
fn handle_document_end(&mut self) {
self.event_buffer
.push_back(Event::document_end(self.position, false));
self.state = ParserState::DocumentEnd;
}
fn handle_flow_sequence_start(&mut self) {
self.push_state(ParserState::FlowSequence);
self.event_buffer.push_back(Event::sequence_start(
self.position,
self.pending_anchor.take(),
self.pending_tag.take(),
true,
));
}
fn handle_flow_sequence_end(&mut self) {
self.event_buffer
.push_back(Event::sequence_end(self.position));
self.pop_state();
}
fn handle_flow_mapping_start(&mut self) {
self.push_state(ParserState::FlowMapping);
self.event_buffer.push_back(Event::mapping_start(
self.position,
self.pending_anchor.take(),
self.pending_tag.take(),
true,
));
}
fn handle_flow_mapping_end(&mut self) {
self.event_buffer
.push_back(Event::mapping_end(self.position));
self.pop_state();
}
fn handle_value_indicator(&mut self) {
match self.state {
ParserState::BlockMappingKey => {
self.state = ParserState::BlockMappingValue;
}
ParserState::FlowMapping => {
}
_ => {
}
}
}
fn handle_zero_copy_scalar(&mut self, token: ZeroToken) -> Result<()> {
if let ZeroTokenType::Scalar(zero_string, quote_style) = token.token_type {
let style = match quote_style {
crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
};
let value = if zero_string.is_borrowed() {
zero_string.as_str().to_string()
} else {
zero_string.into_owned()
};
self.event_buffer.push_back(Event::scalar(
token.start_position,
self.pending_anchor.take(),
self.pending_tag.take(),
value,
style == crate::parser::ScalarStyle::Plain,
style != crate::parser::ScalarStyle::Plain,
style,
));
}
Ok(())
}
fn skip_comment_simple(&mut self) {
if let Some(scanner) = &mut self.zero_scanner {
while let Some(ch) = scanner.current_char() {
scanner.advance();
if ch == '\n' || ch == '\r' {
break;
}
}
}
}
fn process_token(&mut self, token: Token) -> Result<()> {
self.position = token.end_position;
match token.token_type {
TokenType::StreamStart => {
self.event_buffer
.push_back(Event::stream_start(token.start_position));
self.state = ParserState::ImplicitDocumentStart;
}
TokenType::StreamEnd => {
self.event_buffer
.push_back(Event::stream_end(token.start_position));
self.stream_ended = true;
}
TokenType::Scalar(value, quote_style) => {
let style = match quote_style {
crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
};
self.event_buffer.push_back(Event::scalar(
token.start_position,
self.pending_anchor.take(),
self.pending_tag.take(),
value,
style == crate::parser::ScalarStyle::Plain,
style != crate::parser::ScalarStyle::Plain,
style,
));
}
_ => {
}
}
Ok(())
}
fn push_state(&mut self, new_state: ParserState) {
self.state_stack.push(self.state);
self.state = new_state;
self.depth += 1;
if let Some(ref mut stats) = self.stats {
stats.max_depth = stats.max_depth.max(self.depth);
}
}
fn pop_state(&mut self) {
if let Some(prev_state) = self.state_stack.pop() {
self.state = prev_state;
self.depth = self.depth.saturating_sub(1);
}
}
fn update_stats_for_event(&mut self, _event: &Event) {
if let Some(ref mut stats) = self.stats {
stats.events_processed += 1;
stats.max_buffer_size = stats.max_buffer_size.max(self.event_buffer.len());
}
}
pub fn get_stats(&mut self) -> Option<StreamingStats> {
if let Some(ref mut stats) = self.stats {
stats.parse_time_ns = self.start_time.elapsed().as_nanos() as u64;
if let Some(ref scanner) = self.zero_scanner {
stats.scanner_stats = Some(scanner.stats());
}
Some(stats.clone())
} else {
None
}
}
pub fn has_more_events(&self) -> bool {
!self.stream_ended || !self.event_buffer.is_empty()
}
pub fn buffer_size(&self) -> usize {
self.event_buffer.len()
}
}
impl<'a> Parser for StreamingParser<'a> {
fn check_event(&self) -> bool {
!self.event_buffer.is_empty() || !self.stream_ended
}
fn peek_event(&self) -> Result<Option<&Event>> {
Ok(self.event_buffer.front())
}
fn get_event(&mut self) -> Result<Option<Event>> {
self.next_event_internal()
}
fn reset(&mut self) {
self.event_buffer.clear();
self.state = ParserState::StreamStart;
self.state_stack.clear();
self.depth = 0;
self.pending_anchor = None;
self.pending_tag = None;
self.stream_ended = false;
self.start_time = std::time::Instant::now();
if let Some(ref mut scanner) = self.scanner {
scanner.reset();
}
if let Some(ref mut scanner) = self.zero_scanner {
scanner.reset();
}
}
fn position(&self) -> Position {
self.position
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::EventType;
#[test]
fn test_streaming_parser_basic() {
let input = "42";
let config = StreamingConfig {
use_zero_copy: true,
collect_stats: true,
..Default::default()
};
let mut parser = StreamingParser::new_zero_copy(input, config);
assert!(parser.check_event());
let mut all_events = Vec::new();
for _ in 0..5 {
let batch = parser.next_batch().unwrap();
if batch.is_empty() {
break;
}
all_events.extend(batch);
}
assert!(!all_events.is_empty(), "Should generate at least one event");
let has_scalar = all_events.iter().any(|e| {
if let EventType::Scalar { value, .. } = &e.event_type {
value == "42"
} else {
false
}
});
assert!(has_scalar, "Should find scalar event with value '42'");
}
#[test]
fn test_zero_copy_streaming() {
let input = "key: value";
let config = StreamingConfig {
use_zero_copy: true,
collect_stats: true,
..Default::default()
};
let mut parser = StreamingParser::new_zero_copy(input, config);
let batch = parser.next_batch().unwrap();
assert!(!batch.is_empty());
let stats = parser.get_stats();
assert!(stats.is_some());
let stats = stats.unwrap();
assert!(stats.events_processed > 0);
}
#[test]
fn test_streaming_config() {
let config = StreamingConfig {
max_buffer_size: 32,
use_zero_copy: false,
max_depth: 10,
collect_stats: true,
};
let parser = StreamingParser::new("test".to_string(), config);
assert_eq!(parser.config.max_buffer_size, 32);
assert!(!parser.config.use_zero_copy);
assert_eq!(parser.config.max_depth, 10);
assert!(parser.config.collect_stats);
}
#[test]
fn test_flow_collections_streaming() {
let input = "[1, 2, 3]";
let config = StreamingConfig::default();
let mut parser = StreamingParser::new_zero_copy(input, config);
let mut all_events = Vec::new();
while parser.has_more_events() {
let batch = parser.next_batch().unwrap();
if batch.is_empty() {
break;
}
all_events.extend(batch);
}
let has_sequence_start = all_events.iter().any(|e| {
matches!(
e.event_type,
EventType::SequenceStart {
flow_style: true,
..
}
)
});
assert!(has_sequence_start, "Should find flow sequence start");
}
}