1use crate::{
7 parser::{Event, Parser, ParserState},
8 zerocopy::ScannerStats,
9 BasicScanner, Error, Position, Result, Scanner, Token, TokenType, ZeroScanner, ZeroToken,
10 ZeroTokenType,
11};
12use std::collections::VecDeque;
13
14#[derive(Debug, Clone)]
16pub struct StreamingConfig {
17 pub max_buffer_size: usize,
19 pub use_zero_copy: bool,
21 pub max_depth: usize,
23 pub collect_stats: bool,
25}
26
27impl Default for StreamingConfig {
28 fn default() -> Self {
29 Self {
30 max_buffer_size: 64,
31 use_zero_copy: true,
32 max_depth: 256,
33 collect_stats: false,
34 }
35 }
36}
37
38#[derive(Debug, Clone)]
40pub struct StreamingStats {
41 pub events_processed: usize,
43 pub tokens_processed: usize,
45 pub max_buffer_size: usize,
47 pub max_depth: usize,
49 pub scanner_stats: Option<ScannerStats>,
51 pub parse_time_ns: u64,
53}
54
55pub struct StreamingParser<'a> {
57 scanner: Option<BasicScanner>,
59 zero_scanner: Option<ZeroScanner<'a>>,
61 config: StreamingConfig,
63 event_buffer: VecDeque<Event>,
65 state: ParserState,
67 state_stack: Vec<ParserState>,
69 position: Position,
71 depth: usize,
73 pending_anchor: Option<String>,
75 pending_tag: Option<String>,
77 stats: Option<StreamingStats>,
79 start_time: std::time::Instant,
81 stream_ended: bool,
83}
84
85impl<'a> StreamingParser<'a> {
86 pub fn new(input: String, config: StreamingConfig) -> StreamingParser<'static> {
88 let scanner = BasicScanner::new(input);
89 let position = scanner.position();
90
91 StreamingParser {
92 scanner: Some(scanner),
93 zero_scanner: None,
94 config: config.clone(),
95 event_buffer: VecDeque::with_capacity(config.max_buffer_size),
96 state: ParserState::StreamStart,
97 state_stack: Vec::with_capacity(config.max_depth),
98 position,
99 depth: 0,
100 pending_anchor: None,
101 pending_tag: None,
102 stats: if config.collect_stats {
103 Some(StreamingStats {
104 events_processed: 0,
105 tokens_processed: 0,
106 max_buffer_size: 0,
107 max_depth: 0,
108 scanner_stats: None,
109 parse_time_ns: 0,
110 })
111 } else {
112 None
113 },
114 start_time: std::time::Instant::now(),
115 stream_ended: false,
116 }
117 }
118
119 pub fn new_zero_copy(input: &'a str, config: StreamingConfig) -> Self {
121 let zero_scanner = ZeroScanner::new(input);
122 let position = zero_scanner.position;
123
124 Self {
125 scanner: None,
126 zero_scanner: Some(zero_scanner),
127 config: config.clone(),
128 event_buffer: VecDeque::with_capacity(config.max_buffer_size),
129 state: ParserState::StreamStart,
130 state_stack: Vec::with_capacity(config.max_depth),
131 position,
132 depth: 0,
133 pending_anchor: None,
134 pending_tag: None,
135 stats: if config.collect_stats {
136 Some(StreamingStats {
137 events_processed: 0,
138 tokens_processed: 0,
139 max_buffer_size: 0,
140 max_depth: 0,
141 scanner_stats: None,
142 parse_time_ns: 0,
143 })
144 } else {
145 None
146 },
147 start_time: std::time::Instant::now(),
148 stream_ended: false,
149 }
150 }
151
152 pub fn next_batch(&mut self) -> Result<Vec<Event>> {
154 if self.stream_ended {
155 return Ok(Vec::new());
156 }
157
158 let mut events = Vec::new();
159 let target_size = std::cmp::min(self.config.max_buffer_size / 2, 8);
160
161 while events.len() < target_size && !self.stream_ended {
162 if let Some(event) = self.next_event_internal()? {
163 events.push(event);
164 } else {
165 break;
166 }
167 }
168
169 Ok(events)
170 }
171
172 fn next_event_internal(&mut self) -> Result<Option<Event>> {
174 if let Some(event) = self.event_buffer.pop_front() {
176 self.update_stats_for_event(&event);
177 return Ok(Some(event));
178 }
179
180 self.generate_events()?;
182
183 if let Some(event) = self.event_buffer.pop_front() {
185 self.update_stats_for_event(&event);
186 Ok(Some(event))
187 } else {
188 Ok(None)
189 }
190 }
191
192 fn generate_events(&mut self) -> Result<()> {
194 if self.stream_ended {
195 return Ok(());
196 }
197
198 if self.depth > self.config.max_depth {
200 return Err(Error::parse(
201 self.position,
202 format!("Maximum nesting depth exceeded: {}", self.config.max_depth),
203 ));
204 }
205
206 if self.config.use_zero_copy && self.zero_scanner.is_some() {
207 self.generate_events_zero_copy()
208 } else {
209 self.generate_events_traditional()
210 }
211 }
212
213 fn generate_events_zero_copy(&mut self) -> Result<()> {
215 let batch_size = 16;
217 let mut processed = 0;
218
219 while processed < batch_size {
220 let current_char = if let Some(scanner) = &self.zero_scanner {
222 scanner.current_char()
223 } else {
224 None
225 };
226
227 if current_char.is_none() {
228 if !matches!(self.state, ParserState::StreamEnd) {
230 self.event_buffer
231 .push_back(Event::stream_end(self.position));
232 self.stream_ended = true;
233 }
234 break;
235 }
236
237 if let Some(scanner) = &mut self.zero_scanner {
239 scanner.skip_whitespace();
240 }
241
242 let ch = current_char.unwrap();
243 match ch {
244 '-' if self.is_document_start_candidate_simple() => {
245 self.handle_document_start();
246 if let Some(scanner) = &mut self.zero_scanner {
248 scanner.advance();
249 scanner.advance();
250 scanner.advance();
251 }
252 }
253 '.' if self.is_document_end_candidate_simple() => {
254 self.handle_document_end();
255 if let Some(scanner) = &mut self.zero_scanner {
257 scanner.advance();
258 scanner.advance();
259 scanner.advance();
260 }
261 }
262 '[' => {
263 self.handle_flow_sequence_start();
264 if let Some(scanner) = &mut self.zero_scanner {
265 scanner.advance();
266 }
267 }
268 ']' => {
269 self.handle_flow_sequence_end();
270 if let Some(scanner) = &mut self.zero_scanner {
271 scanner.advance();
272 }
273 }
274 '{' => {
275 self.handle_flow_mapping_start();
276 if let Some(scanner) = &mut self.zero_scanner {
277 scanner.advance();
278 }
279 }
280 '}' => {
281 self.handle_flow_mapping_end();
282 if let Some(scanner) = &mut self.zero_scanner {
283 scanner.advance();
284 }
285 }
286 ':' if self.is_value_indicator_simple() => {
287 self.handle_value_indicator();
288 if let Some(scanner) = &mut self.zero_scanner {
289 scanner.advance();
290 }
291 }
292 ',' => {
293 if let Some(scanner) = &mut self.zero_scanner {
295 scanner.advance();
296 }
297 }
298 '#' => {
299 self.skip_comment_simple();
301 }
302 ch if ch.is_alphabetic() || ch.is_numeric() => {
303 let scalar_token = if let Some(scanner) = &mut self.zero_scanner {
305 scanner.scan_plain_scalar_zero_copy()?
306 } else {
307 return Err(Error::parse(
308 self.position,
309 "No scanner available".to_string(),
310 ));
311 };
312 self.handle_zero_copy_scalar(scalar_token)?;
313 }
314 '&' => {
315 if let Some(scanner) = &mut self.zero_scanner {
317 scanner.advance(); let anchor = scanner.scan_identifier_zero_copy()?;
319 self.pending_anchor = Some(anchor.as_str().to_string());
320 }
321 }
322 '*' => {
323 if let Some(scanner) = &mut self.zero_scanner {
325 scanner.advance(); let alias = scanner.scan_identifier_zero_copy()?;
327 self.event_buffer
328 .push_back(Event::alias(self.position, alias.as_str().to_string()));
329 }
330 }
331 _ => {
332 if let Some(scanner) = &mut self.zero_scanner {
334 scanner.advance();
335 }
336 }
337 }
338
339 processed += 1;
340 if let Some(scanner) = &self.zero_scanner {
341 self.position = scanner.position;
342 }
343
344 if let Some(ref mut stats) = self.stats {
345 stats.tokens_processed += 1;
346 }
347 }
348
349 Ok(())
350 }
351
352 fn generate_events_traditional(&mut self) -> Result<()> {
354 for _ in 0..4 {
356 let has_token = if let Some(scanner) = &self.scanner {
357 scanner.check_token()
358 } else {
359 false
360 };
361
362 if !has_token {
363 if !matches!(self.state, ParserState::StreamEnd) {
364 self.event_buffer
365 .push_back(Event::stream_end(self.position));
366 self.stream_ended = true;
367 }
368 break;
369 }
370
371 let token = if let Some(scanner) = &mut self.scanner {
372 scanner.get_token()?
373 } else {
374 None
375 };
376
377 if let Some(token) = token {
378 self.process_token(token)?;
379
380 if let Some(ref mut stats) = self.stats {
381 stats.tokens_processed += 1;
382 }
383 }
384 }
385
386 Ok(())
387 }
388
389 fn is_document_start_candidate_simple(&self) -> bool {
391 if let Some(scanner) = &self.zero_scanner {
392 scanner.current_char() == Some('-')
393 && scanner.peek_char(1) == Some('-')
394 && scanner.peek_char(2) == Some('-')
395 && scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
396 } else {
397 false
398 }
399 }
400
401 fn is_document_end_candidate_simple(&self) -> bool {
403 if let Some(scanner) = &self.zero_scanner {
404 scanner.current_char() == Some('.')
405 && scanner.peek_char(1) == Some('.')
406 && scanner.peek_char(2) == Some('.')
407 && scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
408 } else {
409 false
410 }
411 }
412
413 fn is_value_indicator_simple(&self) -> bool {
415 if let Some(scanner) = &self.zero_scanner {
416 scanner.current_char() == Some(':')
417 && scanner.peek_char(1).map_or(true, |c| c.is_whitespace())
418 } else {
419 false
420 }
421 }
422
423 fn handle_document_start(&mut self) {
425 self.event_buffer
426 .push_back(Event::document_start(self.position, None, vec![], false));
427 self.state = ParserState::DocumentStart;
428 }
429
430 fn handle_document_end(&mut self) {
432 self.event_buffer
433 .push_back(Event::document_end(self.position, false));
434 self.state = ParserState::DocumentEnd;
435 }
436
437 fn handle_flow_sequence_start(&mut self) {
439 self.push_state(ParserState::FlowSequence);
440 self.event_buffer.push_back(Event::sequence_start(
441 self.position,
442 self.pending_anchor.take(),
443 self.pending_tag.take(),
444 true,
445 ));
446 }
447
448 fn handle_flow_sequence_end(&mut self) {
450 self.event_buffer
451 .push_back(Event::sequence_end(self.position));
452 self.pop_state();
453 }
454
455 fn handle_flow_mapping_start(&mut self) {
457 self.push_state(ParserState::FlowMapping);
458 self.event_buffer.push_back(Event::mapping_start(
459 self.position,
460 self.pending_anchor.take(),
461 self.pending_tag.take(),
462 true,
463 ));
464 }
465
466 fn handle_flow_mapping_end(&mut self) {
468 self.event_buffer
469 .push_back(Event::mapping_end(self.position));
470 self.pop_state();
471 }
472
473 fn handle_value_indicator(&mut self) {
475 match self.state {
476 ParserState::BlockMappingKey => {
477 self.state = ParserState::BlockMappingValue;
478 }
479 ParserState::FlowMapping => {
480 }
482 _ => {
483 }
486 }
487 }
488
489 fn handle_zero_copy_scalar(&mut self, token: ZeroToken) -> Result<()> {
491 if let ZeroTokenType::Scalar(zero_string, quote_style) = token.token_type {
492 let style = match quote_style {
494 crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
495 crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
496 crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
497 };
498
499 let value = if zero_string.is_borrowed() {
501 zero_string.as_str().to_string()
502 } else {
503 zero_string.into_owned()
504 };
505
506 self.event_buffer.push_back(Event::scalar(
507 token.start_position,
508 self.pending_anchor.take(),
509 self.pending_tag.take(),
510 value,
511 style == crate::parser::ScalarStyle::Plain,
512 style != crate::parser::ScalarStyle::Plain,
513 style,
514 ));
515 }
516 Ok(())
517 }
518
519 fn skip_comment_simple(&mut self) {
521 if let Some(scanner) = &mut self.zero_scanner {
522 while let Some(ch) = scanner.current_char() {
523 scanner.advance();
524 if ch == '\n' || ch == '\r' {
525 break;
526 }
527 }
528 }
529 }
530
531 fn process_token(&mut self, token: Token) -> Result<()> {
533 self.position = token.end_position;
534
535 match token.token_type {
536 TokenType::StreamStart => {
537 self.event_buffer
538 .push_back(Event::stream_start(token.start_position));
539 self.state = ParserState::ImplicitDocumentStart;
540 }
541 TokenType::StreamEnd => {
542 self.event_buffer
543 .push_back(Event::stream_end(token.start_position));
544 self.stream_ended = true;
545 }
546 TokenType::Scalar(value, quote_style) => {
547 let style = match quote_style {
548 crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
549 crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
550 crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
551 };
552
553 self.event_buffer.push_back(Event::scalar(
554 token.start_position,
555 self.pending_anchor.take(),
556 self.pending_tag.take(),
557 value,
558 style == crate::parser::ScalarStyle::Plain,
559 style != crate::parser::ScalarStyle::Plain,
560 style,
561 ));
562 }
563 _ => {
565 }
567 }
568
569 Ok(())
570 }
571
572 fn push_state(&mut self, new_state: ParserState) {
574 self.state_stack.push(self.state);
575 self.state = new_state;
576 self.depth += 1;
577
578 if let Some(ref mut stats) = self.stats {
579 stats.max_depth = stats.max_depth.max(self.depth);
580 }
581 }
582
583 fn pop_state(&mut self) {
585 if let Some(prev_state) = self.state_stack.pop() {
586 self.state = prev_state;
587 self.depth = self.depth.saturating_sub(1);
588 }
589 }
590
591 fn update_stats_for_event(&mut self, _event: &Event) {
593 if let Some(ref mut stats) = self.stats {
594 stats.events_processed += 1;
595 stats.max_buffer_size = stats.max_buffer_size.max(self.event_buffer.len());
596 }
597 }
598
599 pub fn get_stats(&mut self) -> Option<StreamingStats> {
601 if let Some(ref mut stats) = self.stats {
602 stats.parse_time_ns = self.start_time.elapsed().as_nanos() as u64;
603
604 if let Some(ref scanner) = self.zero_scanner {
605 stats.scanner_stats = Some(scanner.stats());
606 }
607
608 Some(stats.clone())
609 } else {
610 None
611 }
612 }
613
614 pub fn has_more_events(&self) -> bool {
616 !self.stream_ended || !self.event_buffer.is_empty()
617 }
618
619 pub fn buffer_size(&self) -> usize {
621 self.event_buffer.len()
622 }
623}
624
625impl<'a> Parser for StreamingParser<'a> {
626 fn check_event(&self) -> bool {
627 !self.event_buffer.is_empty() || !self.stream_ended
628 }
629
630 fn peek_event(&self) -> Result<Option<&Event>> {
631 Ok(self.event_buffer.front())
632 }
633
634 fn get_event(&mut self) -> Result<Option<Event>> {
635 self.next_event_internal()
636 }
637
638 fn reset(&mut self) {
639 self.event_buffer.clear();
640 self.state = ParserState::StreamStart;
641 self.state_stack.clear();
642 self.depth = 0;
643 self.pending_anchor = None;
644 self.pending_tag = None;
645 self.stream_ended = false;
646 self.start_time = std::time::Instant::now();
647
648 if let Some(ref mut scanner) = self.scanner {
649 scanner.reset();
650 }
651 if let Some(ref mut scanner) = self.zero_scanner {
652 scanner.reset();
653 }
654 }
655
656 fn position(&self) -> Position {
657 self.position
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664 use crate::EventType;
665
666 #[test]
667 fn test_streaming_parser_basic() {
668 let input = "42";
670 let config = StreamingConfig {
671 use_zero_copy: true,
672 collect_stats: true,
673 ..Default::default()
674 };
675 let mut parser = StreamingParser::new_zero_copy(input, config);
676
677 assert!(parser.check_event());
679
680 let mut all_events = Vec::new();
682 for _ in 0..5 {
683 let batch = parser.next_batch().unwrap();
684 if batch.is_empty() {
685 break;
686 }
687 all_events.extend(batch);
688 }
689
690 assert!(!all_events.is_empty(), "Should generate at least one event");
691
692 let has_scalar = all_events.iter().any(|e| {
694 if let EventType::Scalar { value, .. } = &e.event_type {
695 value == "42"
696 } else {
697 false
698 }
699 });
700 assert!(has_scalar, "Should find scalar event with value '42'");
701 }
702
703 #[test]
704 fn test_zero_copy_streaming() {
705 let input = "key: value";
706 let config = StreamingConfig {
707 use_zero_copy: true,
708 collect_stats: true,
709 ..Default::default()
710 };
711
712 let mut parser = StreamingParser::new_zero_copy(input, config);
713
714 let batch = parser.next_batch().unwrap();
716 assert!(!batch.is_empty());
717
718 let stats = parser.get_stats();
720 assert!(stats.is_some());
721
722 let stats = stats.unwrap();
723 assert!(stats.events_processed > 0);
724 }
725
726 #[test]
727 fn test_streaming_config() {
728 let config = StreamingConfig {
729 max_buffer_size: 32,
730 use_zero_copy: false,
731 max_depth: 10,
732 collect_stats: true,
733 };
734
735 let parser = StreamingParser::new("test".to_string(), config);
736 assert_eq!(parser.config.max_buffer_size, 32);
737 assert!(!parser.config.use_zero_copy);
738 assert_eq!(parser.config.max_depth, 10);
739 assert!(parser.config.collect_stats);
740 }
741
742 #[test]
743 fn test_flow_collections_streaming() {
744 let input = "[1, 2, 3]";
745 let config = StreamingConfig::default();
746
747 let mut parser = StreamingParser::new_zero_copy(input, config);
748
749 let mut all_events = Vec::new();
750 while parser.has_more_events() {
751 let batch = parser.next_batch().unwrap();
752 if batch.is_empty() {
753 break;
754 }
755 all_events.extend(batch);
756 }
757
758 let has_sequence_start = all_events.iter().any(|e| {
760 matches!(
761 e.event_type,
762 EventType::SequenceStart {
763 flow_style: true,
764 ..
765 }
766 )
767 });
768 assert!(has_sequence_start, "Should find flow sequence start");
769 }
770}