1use crate::{
10 parser::{Event, EventType},
11 Error, Limits, Position, ResourceTracker, Result,
12};
13use std::collections::VecDeque;
14use std::io::{BufRead, BufReader};
15use std::path::Path;
16
17#[derive(Debug, Clone)]
19pub struct StreamConfig {
20 pub buffer_size: usize,
22 pub max_event_buffer: usize,
24 pub incremental: bool,
26 pub limits: Limits,
28 pub chunk_size: usize,
30}
31
32impl Default for StreamConfig {
33 fn default() -> Self {
34 Self {
35 buffer_size: 64 * 1024, max_event_buffer: 1000,
37 incremental: true,
38 limits: Limits::default(),
39 chunk_size: 8 * 1024, }
41 }
42}
43
44impl StreamConfig {
45 pub fn large_file() -> Self {
47 Self {
48 buffer_size: 1024 * 1024, max_event_buffer: 10000,
50 incremental: true,
51 limits: Limits::permissive(),
52 chunk_size: 64 * 1024, }
54 }
55
56 pub fn low_memory() -> Self {
58 Self {
59 buffer_size: 8 * 1024, max_event_buffer: 100,
61 incremental: true,
62 limits: Limits::strict(),
63 chunk_size: 1024, }
65 }
66}
67
68#[derive(Debug, Clone, PartialEq)]
70enum StreamState {
71 Initial,
73 InDocument,
75 BetweenDocuments,
77 EndOfStream,
79 Error(String),
81}
82
83pub struct StreamingYamlParser<R: BufRead> {
85 reader: R,
87 config: StreamConfig,
89 state: StreamState,
91 buffer: String,
93 events: VecDeque<Event>,
95 position: Position,
97 resource_tracker: ResourceTracker,
99 context: ParseContext,
101 stats: StreamStats,
103}
104
105#[derive(Debug, Clone)]
107struct ParseContext {
108 collection_stack: Vec<bool>,
110 indent_level: usize,
112 pending_anchor: Option<String>,
114 pending_tag: Option<String>,
116 in_block_scalar: bool,
118 block_scalar_indent: Option<usize>,
120}
121
122impl ParseContext {
123 fn new() -> Self {
124 Self {
125 collection_stack: Vec::new(),
126 indent_level: 0,
127 pending_anchor: None,
128 pending_tag: None,
129 in_block_scalar: false,
130 block_scalar_indent: None,
131 }
132 }
133
134 fn reset(&mut self) {
135 self.collection_stack.clear();
136 self.indent_level = 0;
137 self.pending_anchor = None;
138 self.pending_tag = None;
139 self.in_block_scalar = false;
140 self.block_scalar_indent = None;
141 }
142}
143
144#[derive(Debug, Clone, Default)]
146pub struct StreamStats {
147 pub bytes_read: usize,
149 pub events_generated: usize,
151 pub documents_parsed: usize,
153 pub errors_encountered: usize,
155 pub max_buffer_size: usize,
157 pub parse_time_ms: u64,
159}
160
161impl<R: BufRead> StreamingYamlParser<R> {
162 pub fn new(reader: R, config: StreamConfig) -> Self {
164 Self {
165 reader,
166 config,
167 state: StreamState::Initial,
168 buffer: String::with_capacity(4096),
169 events: VecDeque::with_capacity(100),
170 position: Position::new(),
171 resource_tracker: ResourceTracker::new(),
172 context: ParseContext::new(),
173 stats: StreamStats::default(),
174 }
175 }
176
177 pub fn parse_next(&mut self) -> Result<bool> {
179 let start = std::time::Instant::now();
180
181 let bytes_read = self.read_chunk()?;
183 if bytes_read == 0 && self.buffer.is_empty() {
184 self.state = StreamState::EndOfStream;
185 return Ok(false);
186 }
187
188 self.stats.bytes_read += bytes_read;
189
190 self.parse_buffer()?;
192
193 self.stats.parse_time_ms += start.elapsed().as_millis() as u64;
195 self.stats.max_buffer_size = self.stats.max_buffer_size.max(self.buffer.len());
196
197 Ok(!self.events.is_empty())
198 }
199
200 fn read_chunk(&mut self) -> Result<usize> {
202 let mut temp_buffer = vec![0u8; self.config.chunk_size];
203 let bytes_read = self.reader.read(&mut temp_buffer)?;
204
205 if bytes_read > 0 {
206 let chunk = String::from_utf8_lossy(&temp_buffer[..bytes_read]);
207 self.buffer.push_str(&chunk);
208 }
209
210 Ok(bytes_read)
211 }
212
213 fn parse_buffer(&mut self) -> Result<()> {
215 match self.state {
217 StreamState::Initial => {
218 self.emit_stream_start()?;
219 self.state = StreamState::BetweenDocuments;
220 }
221 StreamState::BetweenDocuments => {
222 self.parse_document_start()?;
223 }
224 StreamState::InDocument => {
225 self.parse_document_content()?;
226 }
227 StreamState::EndOfStream => {
228 return Ok(());
229 }
230 StreamState::Error(ref msg) => {
231 return Err(Error::parse(self.position, msg.clone()));
232 }
233 }
234
235 Ok(())
236 }
237
238 fn parse_document_start(&mut self) -> Result<()> {
240 self.skip_whitespace();
242
243 if self.buffer.starts_with("---") {
245 self.buffer.drain(..3);
246 self.position.column += 3;
247 self.emit_document_start()?;
248 self.state = StreamState::InDocument;
249 } else if !self.buffer.is_empty() {
250 self.emit_document_start()?;
252 self.state = StreamState::InDocument;
253 self.parse_document_content()?;
254 }
255
256 Ok(())
257 }
258
259 fn parse_document_content(&mut self) -> Result<()> {
261 while !self.buffer.is_empty() {
262 if self.buffer.starts_with("...") {
264 self.buffer.drain(..3);
265 self.position.column += 3;
266 self.emit_document_end()?;
267 self.state = StreamState::BetweenDocuments;
268 self.context.reset();
269 break;
270 }
271
272 if self.context.in_block_scalar {
274 self.parse_block_scalar_content()?;
275 } else {
276 self.parse_yaml_content()?;
277 }
278
279 if self.needs_more_data() {
281 break;
282 }
283 }
284
285 Ok(())
286 }
287
288 fn parse_yaml_content(&mut self) -> Result<()> {
290 self.skip_whitespace();
291
292 if self.buffer.is_empty() {
293 return Ok(());
294 }
295
296 let first_char = self.buffer.chars().next().unwrap();
297
298 match first_char {
299 '-' if self.is_sequence_item() => {
300 self.parse_sequence_item()?;
301 }
302 '[' => {
303 self.parse_flow_sequence()?;
304 }
305 '{' => {
306 self.parse_flow_mapping()?;
307 }
308 '|' | '>' => {
309 self.parse_block_scalar_start(first_char)?;
310 }
311 '&' => {
312 self.parse_anchor()?;
313 }
314 '*' => {
315 self.parse_alias()?;
316 }
317 '"' | '\'' => {
318 self.parse_quoted_scalar(first_char)?;
319 }
320 '#' => {
321 self.skip_comment();
322 }
323 '\n' => {
324 self.buffer.remove(0);
325 self.position.line += 1;
326 self.position.column = 0;
327 }
328 _ if self.is_mapping_key() => {
329 self.parse_mapping_entry()?;
330 }
331 _ => {
332 self.parse_plain_scalar()?;
333 }
334 }
335
336 Ok(())
337 }
338
339 fn needs_more_data(&self) -> bool {
341 if self.buffer.len() < 100 && !self.buffer.contains('\n') {
343 return true;
344 }
345
346 if self.context.in_block_scalar && !self.has_complete_block_scalar() {
348 return true;
349 }
350
351 false
352 }
353
354 fn has_complete_block_scalar(&self) -> bool {
356 self.buffer.contains("\n\n") || self.buffer.contains("\n...")
358 }
359
360 fn parse_sequence_item(&mut self) -> Result<()> {
362 self.buffer.remove(0); self.position.column += 1;
364
365 if !self.context.collection_stack.iter().any(|&x| !x) {
367 self.emit_sequence_start()?;
368 self.context.collection_stack.push(false);
369 }
370
371 self.skip_whitespace();
372 Ok(())
373 }
374
375 fn parse_mapping_entry(&mut self) -> Result<()> {
377 let key_end = self.find_mapping_key_end();
379 if let Some(end) = key_end {
380 let key = self.buffer.drain(..end).collect::<String>();
381 self.position.column += key.len();
382
383 if self.buffer.starts_with(':') {
385 self.buffer.remove(0);
386 self.position.column += 1;
387 }
388
389 if !self.context.collection_stack.iter().any(|&x| x) {
391 self.emit_mapping_start()?;
392 self.context.collection_stack.push(true);
393 }
394
395 self.emit_scalar(key.trim().to_string())?;
397 }
398
399 Ok(())
400 }
401
402 fn skip_whitespace(&mut self) {
404 while let Some(ch) = self.buffer.chars().next() {
405 if ch == ' ' || ch == '\t' {
406 self.buffer.remove(0);
407 self.position.column += 1;
408 } else {
409 break;
410 }
411 }
412 }
413
414 fn skip_comment(&mut self) {
415 if let Some(newline_pos) = self.buffer.find('\n') {
416 self.buffer.drain(..newline_pos);
417 self.position.column = 0;
418 } else {
419 self.buffer.clear();
420 }
421 }
422
423 fn is_sequence_item(&self) -> bool {
424 self.buffer.starts_with("- ")
425 }
426
427 fn is_mapping_key(&self) -> bool {
428 self.buffer.contains(':') && !self.buffer.starts_with(':')
430 }
431
432 fn find_mapping_key_end(&self) -> Option<usize> {
433 self.buffer.find(':')
434 }
435
436 fn parse_flow_sequence(&mut self) -> Result<()> {
437 if let Some(end) = self.buffer.find(']') {
439 let content = self.buffer.drain(..=end).collect::<String>();
440 self.emit_sequence_start()?;
441 self.emit_sequence_end()?;
443 self.position.column += content.len();
444 }
445 Ok(())
446 }
447
448 fn parse_flow_mapping(&mut self) -> Result<()> {
449 if let Some(end) = self.buffer.find('}') {
451 let content = self.buffer.drain(..=end).collect::<String>();
452 self.emit_mapping_start()?;
453 self.emit_mapping_end()?;
455 self.position.column += content.len();
456 }
457 Ok(())
458 }
459
460 fn parse_block_scalar_start(&mut self, _indicator: char) -> Result<()> {
461 self.buffer.remove(0); self.context.in_block_scalar = true;
463 Ok(())
465 }
466
467 fn parse_block_scalar_content(&mut self) -> Result<()> {
468 if let Some(end) = self.find_block_scalar_end() {
470 let content = self.buffer.drain(..end).collect::<String>();
471 self.emit_scalar(content)?;
472 self.context.in_block_scalar = false;
473 }
474 Ok(())
475 }
476
477 fn find_block_scalar_end(&self) -> Option<usize> {
478 self.buffer.find("\n\n").or(self.buffer.find("\n..."))
480 }
481
482 fn parse_anchor(&mut self) -> Result<()> {
483 self.buffer.remove(0); let end = self.find_identifier_end();
485 if let Some(end) = end {
486 let anchor = self.buffer.drain(..end).collect::<String>();
487 self.context.pending_anchor = Some(anchor);
488 self.position.column += end + 1;
489 }
490 Ok(())
491 }
492
493 fn parse_alias(&mut self) -> Result<()> {
494 self.buffer.remove(0); let end = self.find_identifier_end();
496 if let Some(end) = end {
497 let alias = self.buffer.drain(..end).collect::<String>();
498 self.emit_alias(alias)?;
499 self.position.column += end + 1;
500 }
501 Ok(())
502 }
503
504 fn parse_quoted_scalar(&mut self, quote: char) -> Result<()> {
505 self.buffer.remove(0); if let Some(end) = self.buffer.find(quote) {
507 let content = self.buffer.drain(..end).collect::<String>();
508 self.buffer.remove(0); let content_len = content.len();
510 self.emit_scalar(content)?;
511 self.position.column += content_len + 2;
512 }
513 Ok(())
514 }
515
516 fn parse_plain_scalar(&mut self) -> Result<()> {
517 let end = self.find_plain_scalar_end();
518 if let Some(end) = end {
519 let content = self.buffer.drain(..end).collect::<String>();
520 self.emit_scalar(content.trim().to_string())?;
521 self.position.column += end;
522 }
523 Ok(())
524 }
525
526 fn find_identifier_end(&self) -> Option<usize> {
527 for (i, ch) in self.buffer.char_indices() {
528 if !ch.is_alphanumeric() && ch != '_' && ch != '-' {
529 return Some(i);
530 }
531 }
532 None
533 }
534
535 fn find_plain_scalar_end(&self) -> Option<usize> {
536 for (i, ch) in self.buffer.char_indices() {
538 if ch == '\n' || ch == ':' || ch == '#' {
539 return Some(i);
540 }
541 }
542 Some(self.buffer.len())
543 }
544
545 fn emit_stream_start(&mut self) -> Result<()> {
547 self.events.push_back(Event {
548 event_type: EventType::StreamStart,
549 position: self.position,
550 });
551 self.stats.events_generated += 1;
552 Ok(())
553 }
554
555 fn emit_stream_end(&mut self) -> Result<()> {
556 self.events.push_back(Event {
557 event_type: EventType::StreamEnd,
558 position: self.position,
559 });
560 self.stats.events_generated += 1;
561 Ok(())
562 }
563
564 fn emit_document_start(&mut self) -> Result<()> {
565 self.events.push_back(Event {
566 event_type: EventType::DocumentStart {
567 version: None,
568 tags: Vec::new(),
569 implicit: false,
570 },
571 position: self.position,
572 });
573 self.stats.events_generated += 1;
574 self.stats.documents_parsed += 1;
575 Ok(())
576 }
577
578 fn emit_document_end(&mut self) -> Result<()> {
579 self.events.push_back(Event {
580 event_type: EventType::DocumentEnd { implicit: false },
581 position: self.position,
582 });
583 self.stats.events_generated += 1;
584 Ok(())
585 }
586
587 fn emit_sequence_start(&mut self) -> Result<()> {
588 let anchor = self.context.pending_anchor.take();
589 let tag = self.context.pending_tag.take();
590
591 self.events.push_back(Event {
592 event_type: EventType::SequenceStart {
593 anchor,
594 tag,
595 flow_style: false,
596 },
597 position: self.position,
598 });
599 self.stats.events_generated += 1;
600 Ok(())
601 }
602
603 fn emit_sequence_end(&mut self) -> Result<()> {
604 self.events.push_back(Event {
605 event_type: EventType::SequenceEnd,
606 position: self.position,
607 });
608 self.stats.events_generated += 1;
609 Ok(())
610 }
611
612 fn emit_mapping_start(&mut self) -> Result<()> {
613 let anchor = self.context.pending_anchor.take();
614 let tag = self.context.pending_tag.take();
615
616 self.events.push_back(Event {
617 event_type: EventType::MappingStart {
618 anchor,
619 tag,
620 flow_style: false,
621 },
622 position: self.position,
623 });
624 self.stats.events_generated += 1;
625 Ok(())
626 }
627
628 fn emit_mapping_end(&mut self) -> Result<()> {
629 self.events.push_back(Event {
630 event_type: EventType::MappingEnd,
631 position: self.position,
632 });
633 self.stats.events_generated += 1;
634 Ok(())
635 }
636
637 fn emit_scalar(&mut self, value: String) -> Result<()> {
638 let anchor = self.context.pending_anchor.take();
639 let tag = self.context.pending_tag.take();
640
641 self.events.push_back(Event {
642 event_type: EventType::Scalar {
643 value,
644 anchor,
645 tag,
646 style: crate::parser::ScalarStyle::Plain,
647 plain_implicit: true,
648 quoted_implicit: true,
649 },
650 position: self.position,
651 });
652 self.stats.events_generated += 1;
653 Ok(())
654 }
655
656 fn emit_alias(&mut self, anchor: String) -> Result<()> {
657 self.events.push_back(Event {
658 event_type: EventType::Alias { anchor },
659 position: self.position,
660 });
661 self.stats.events_generated += 1;
662 Ok(())
663 }
664
665 pub fn next_event(&mut self) -> Option<Event> {
667 self.events.pop_front()
668 }
669
670 pub fn has_events(&self) -> bool {
672 !self.events.is_empty()
673 }
674
675 pub fn stats(&self) -> &StreamStats {
677 &self.stats
678 }
679
680 pub fn buffer_size(&self) -> usize {
682 self.buffer.len()
683 }
684}
685
686impl<R: BufRead> Iterator for StreamingYamlParser<R> {
688 type Item = Result<Event>;
689
690 fn next(&mut self) -> Option<Self::Item> {
691 if let Some(event) = self.next_event() {
693 return Some(Ok(event));
694 }
695
696 match self.parse_next() {
698 Ok(true) => self.next_event().map(Ok),
699 Ok(false) if self.state == StreamState::EndOfStream => {
700 if !self.events.is_empty() {
701 self.next_event().map(Ok)
702 } else {
703 None
704 }
705 }
706 Ok(false) => None,
707 Err(e) => Some(Err(e)),
708 }
709 }
710}
711
712pub fn stream_from_file<P: AsRef<Path>>(
714 path: P,
715 config: StreamConfig,
716) -> Result<StreamingYamlParser<BufReader<std::fs::File>>> {
717 let file = std::fs::File::open(path)?;
718 let reader = BufReader::with_capacity(config.buffer_size, file);
719 Ok(StreamingYamlParser::new(reader, config))
720}
721
722pub fn stream_from_string(
724 input: String,
725 config: StreamConfig,
726) -> StreamingYamlParser<BufReader<std::io::Cursor<String>>> {
727 let cursor = std::io::Cursor::new(input);
728 let reader = BufReader::new(cursor);
729 StreamingYamlParser::new(reader, config)
730}
731
732#[cfg(test)]
733mod tests {
734 use super::*;
735 use std::io::Cursor;
736
737 #[test]
738 fn test_basic_streaming() {
739 let yaml = "---\nkey: value\n...\n---\nother: data\n...";
740 let cursor = Cursor::new(yaml.to_string());
741 let reader = BufReader::new(cursor);
742 let mut parser = StreamingYamlParser::new(reader, StreamConfig::default());
743
744 let mut events = Vec::new();
745 while let Some(event) = parser.next() {
746 events.push(event.unwrap());
747 }
748
749 assert!(events.len() > 0);
750 assert!(matches!(events[0].event_type, EventType::StreamStart));
751 }
752
753 #[test]
754 fn test_incremental_parsing() {
755 let yaml = "key: value\nlist:\n - item1\n - item2";
756 let mut parser = stream_from_string(yaml.to_string(), StreamConfig::default());
757
758 let mut event_count = 0;
760 while parser.parse_next().unwrap() {
761 while let Some(_event) = parser.next_event() {
762 event_count += 1;
763 }
764 }
765
766 assert!(event_count > 0);
767 }
768
769 #[test]
770 fn test_large_buffer_handling() {
771 let mut yaml = String::new();
772 for i in 0..1000 {
773 yaml.push_str(&format!("item{}: value{}\n", i, i));
774 }
775
776 let config = StreamConfig::large_file();
777 let mut parser = stream_from_string(yaml, config);
778
779 let mut events = Vec::new();
780 for event in parser.take(100) {
781 events.push(event.unwrap());
782 }
783
784 assert!(events.len() > 0);
785 }
786}