rust_yaml/
streaming_enhanced.rs

1//! Enhanced streaming YAML parser with true incremental parsing
2//!
3//! This module provides advanced streaming capabilities including:
4//! - Incremental parsing with partial document support
5//! - Async/await support for I/O operations
6//! - Memory-mapped file support for large documents
7//! - Buffered reading with configurable chunk sizes
8
9use crate::{
10    parser::{Event, EventType},
11    Error, Limits, Position, ResourceTracker, Result,
12};
13use std::collections::VecDeque;
14use std::io::{BufRead, BufReader};
15use std::path::Path;
16
17/// Configuration for enhanced streaming parser
18#[derive(Debug, Clone)]
19pub struct StreamConfig {
20    /// Size of the read buffer in bytes
21    pub buffer_size: usize,
22    /// Maximum number of events to buffer
23    pub max_event_buffer: usize,
24    /// Enable incremental parsing (parse partial documents)
25    pub incremental: bool,
26    /// Resource limits
27    pub limits: Limits,
28    /// Chunk size for reading (bytes)
29    pub chunk_size: usize,
30}
31
32impl Default for StreamConfig {
33    fn default() -> Self {
34        Self {
35            buffer_size: 64 * 1024, // 64KB buffer
36            max_event_buffer: 1000,
37            incremental: true,
38            limits: Limits::default(),
39            chunk_size: 8 * 1024, // 8KB chunks
40        }
41    }
42}
43
44impl StreamConfig {
45    /// Create config for large files
46    pub fn large_file() -> Self {
47        Self {
48            buffer_size: 1024 * 1024, // 1MB buffer
49            max_event_buffer: 10000,
50            incremental: true,
51            limits: Limits::permissive(),
52            chunk_size: 64 * 1024, // 64KB chunks
53        }
54    }
55
56    /// Create config for memory-constrained environments
57    pub fn low_memory() -> Self {
58        Self {
59            buffer_size: 8 * 1024, // 8KB buffer
60            max_event_buffer: 100,
61            incremental: true,
62            limits: Limits::strict(),
63            chunk_size: 1024, // 1KB chunks
64        }
65    }
66}
67
68/// State of the streaming parser
69#[derive(Debug, Clone, PartialEq)]
70enum StreamState {
71    /// Initial state
72    Initial,
73    /// Reading document
74    InDocument,
75    /// Between documents
76    BetweenDocuments,
77    /// End of stream
78    EndOfStream,
79    /// Error state
80    Error(String),
81}
82
83/// Enhanced streaming YAML parser
84pub struct StreamingYamlParser<R: BufRead> {
85    /// Input reader
86    reader: R,
87    /// Configuration
88    config: StreamConfig,
89    /// Current parsing state
90    state: StreamState,
91    /// Buffer for incomplete data
92    buffer: String,
93    /// Event queue
94    events: VecDeque<Event>,
95    /// Current position in the stream
96    position: Position,
97    /// Resource tracker
98    resource_tracker: ResourceTracker,
99    /// Parse context for incremental parsing
100    context: ParseContext,
101    /// Statistics
102    stats: StreamStats,
103}
104
105/// Parsing context for incremental parsing
106#[derive(Debug, Clone)]
107struct ParseContext {
108    /// Stack of collection types (true = mapping, false = sequence)
109    collection_stack: Vec<bool>,
110    /// Current indentation level
111    indent_level: usize,
112    /// Pending anchor
113    pending_anchor: Option<String>,
114    /// Pending tag
115    pending_tag: Option<String>,
116    /// In block scalar
117    in_block_scalar: bool,
118    /// Block scalar indent
119    block_scalar_indent: Option<usize>,
120}
121
122impl ParseContext {
123    fn new() -> Self {
124        Self {
125            collection_stack: Vec::new(),
126            indent_level: 0,
127            pending_anchor: None,
128            pending_tag: None,
129            in_block_scalar: false,
130            block_scalar_indent: None,
131        }
132    }
133
134    fn reset(&mut self) {
135        self.collection_stack.clear();
136        self.indent_level = 0;
137        self.pending_anchor = None;
138        self.pending_tag = None;
139        self.in_block_scalar = false;
140        self.block_scalar_indent = None;
141    }
142}
143
144/// Statistics for streaming parser
145#[derive(Debug, Clone, Default)]
146pub struct StreamStats {
147    /// Total bytes read
148    pub bytes_read: usize,
149    /// Total events generated
150    pub events_generated: usize,
151    /// Documents parsed
152    pub documents_parsed: usize,
153    /// Parse errors encountered
154    pub errors_encountered: usize,
155    /// Maximum buffer size used
156    pub max_buffer_size: usize,
157    /// Total parse time (milliseconds)
158    pub parse_time_ms: u64,
159}
160
161impl<R: BufRead> StreamingYamlParser<R> {
162    /// Create a new streaming parser from a reader
163    pub fn new(reader: R, config: StreamConfig) -> Self {
164        Self {
165            reader,
166            config,
167            state: StreamState::Initial,
168            buffer: String::with_capacity(4096),
169            events: VecDeque::with_capacity(100),
170            position: Position::new(),
171            resource_tracker: ResourceTracker::new(),
172            context: ParseContext::new(),
173            stats: StreamStats::default(),
174        }
175    }
176
177    /// Parse the next chunk of data
178    pub fn parse_next(&mut self) -> Result<bool> {
179        let start = std::time::Instant::now();
180
181        // Read next chunk
182        let bytes_read = self.read_chunk()?;
183        if bytes_read == 0 && self.buffer.is_empty() {
184            self.state = StreamState::EndOfStream;
185            return Ok(false);
186        }
187
188        self.stats.bytes_read += bytes_read;
189
190        // Parse buffer content
191        self.parse_buffer()?;
192
193        // Update statistics
194        self.stats.parse_time_ms += start.elapsed().as_millis() as u64;
195        self.stats.max_buffer_size = self.stats.max_buffer_size.max(self.buffer.len());
196
197        Ok(!self.events.is_empty())
198    }
199
200    /// Read a chunk of data from the reader
201    fn read_chunk(&mut self) -> Result<usize> {
202        let mut temp_buffer = vec![0u8; self.config.chunk_size];
203        let bytes_read = self.reader.read(&mut temp_buffer)?;
204
205        if bytes_read > 0 {
206            let chunk = String::from_utf8_lossy(&temp_buffer[..bytes_read]);
207            self.buffer.push_str(&chunk);
208        }
209
210        Ok(bytes_read)
211    }
212
213    /// Parse the current buffer content
214    fn parse_buffer(&mut self) -> Result<()> {
215        // Handle different states
216        match self.state {
217            StreamState::Initial => {
218                self.emit_stream_start()?;
219                self.state = StreamState::BetweenDocuments;
220            }
221            StreamState::BetweenDocuments => {
222                self.parse_document_start()?;
223            }
224            StreamState::InDocument => {
225                self.parse_document_content()?;
226            }
227            StreamState::EndOfStream => {
228                return Ok(());
229            }
230            StreamState::Error(ref msg) => {
231                return Err(Error::parse(self.position, msg.clone()));
232            }
233        }
234
235        Ok(())
236    }
237
238    /// Parse document start markers
239    fn parse_document_start(&mut self) -> Result<()> {
240        // Skip whitespace
241        self.skip_whitespace();
242
243        // Check for document start marker (---)
244        if self.buffer.starts_with("---") {
245            self.buffer.drain(..3);
246            self.position.column += 3;
247            self.emit_document_start()?;
248            self.state = StreamState::InDocument;
249        } else if !self.buffer.is_empty() {
250            // Implicit document start
251            self.emit_document_start()?;
252            self.state = StreamState::InDocument;
253            self.parse_document_content()?;
254        }
255
256        Ok(())
257    }
258
259    /// Parse document content incrementally
260    fn parse_document_content(&mut self) -> Result<()> {
261        while !self.buffer.is_empty() {
262            // Check for document end marker (...)
263            if self.buffer.starts_with("...") {
264                self.buffer.drain(..3);
265                self.position.column += 3;
266                self.emit_document_end()?;
267                self.state = StreamState::BetweenDocuments;
268                self.context.reset();
269                break;
270            }
271
272            // Parse based on context
273            if self.context.in_block_scalar {
274                self.parse_block_scalar_content()?;
275            } else {
276                self.parse_yaml_content()?;
277            }
278
279            // Break if we need more data
280            if self.needs_more_data() {
281                break;
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Parse YAML content (scalars, collections, etc.)
289    fn parse_yaml_content(&mut self) -> Result<()> {
290        self.skip_whitespace();
291
292        if self.buffer.is_empty() {
293            return Ok(());
294        }
295
296        let first_char = self.buffer.chars().next().unwrap();
297
298        match first_char {
299            '-' if self.is_sequence_item() => {
300                self.parse_sequence_item()?;
301            }
302            '[' => {
303                self.parse_flow_sequence()?;
304            }
305            '{' => {
306                self.parse_flow_mapping()?;
307            }
308            '|' | '>' => {
309                self.parse_block_scalar_start(first_char)?;
310            }
311            '&' => {
312                self.parse_anchor()?;
313            }
314            '*' => {
315                self.parse_alias()?;
316            }
317            '"' | '\'' => {
318                self.parse_quoted_scalar(first_char)?;
319            }
320            '#' => {
321                self.skip_comment();
322            }
323            '\n' => {
324                self.buffer.remove(0);
325                self.position.line += 1;
326                self.position.column = 0;
327            }
328            _ if self.is_mapping_key() => {
329                self.parse_mapping_entry()?;
330            }
331            _ => {
332                self.parse_plain_scalar()?;
333            }
334        }
335
336        Ok(())
337    }
338
339    /// Check if we need more data to continue parsing
340    fn needs_more_data(&self) -> bool {
341        // If buffer is small and doesn't contain a complete line
342        if self.buffer.len() < 100 && !self.buffer.contains('\n') {
343            return true;
344        }
345
346        // If we're in a block scalar and need more lines
347        if self.context.in_block_scalar && !self.has_complete_block_scalar() {
348            return true;
349        }
350
351        false
352    }
353
354    /// Check if buffer contains a complete block scalar
355    fn has_complete_block_scalar(&self) -> bool {
356        // Simplified check - in production, would be more sophisticated
357        self.buffer.contains("\n\n") || self.buffer.contains("\n...")
358    }
359
360    /// Parse a sequence item
361    fn parse_sequence_item(&mut self) -> Result<()> {
362        self.buffer.remove(0); // Remove '-'
363        self.position.column += 1;
364
365        // Start sequence if needed
366        if !self.context.collection_stack.iter().any(|&x| !x) {
367            self.emit_sequence_start()?;
368            self.context.collection_stack.push(false);
369        }
370
371        self.skip_whitespace();
372        Ok(())
373    }
374
375    /// Parse a mapping entry
376    fn parse_mapping_entry(&mut self) -> Result<()> {
377        // Parse key
378        let key_end = self.find_mapping_key_end();
379        if let Some(end) = key_end {
380            let key = self.buffer.drain(..end).collect::<String>();
381            self.position.column += key.len();
382
383            // Skip ':' and whitespace
384            if self.buffer.starts_with(':') {
385                self.buffer.remove(0);
386                self.position.column += 1;
387            }
388
389            // Start mapping if needed
390            if !self.context.collection_stack.iter().any(|&x| x) {
391                self.emit_mapping_start()?;
392                self.context.collection_stack.push(true);
393            }
394
395            // Emit key as scalar
396            self.emit_scalar(key.trim().to_string())?;
397        }
398
399        Ok(())
400    }
401
402    /// Helper methods for parsing
403    fn skip_whitespace(&mut self) {
404        while let Some(ch) = self.buffer.chars().next() {
405            if ch == ' ' || ch == '\t' {
406                self.buffer.remove(0);
407                self.position.column += 1;
408            } else {
409                break;
410            }
411        }
412    }
413
414    fn skip_comment(&mut self) {
415        if let Some(newline_pos) = self.buffer.find('\n') {
416            self.buffer.drain(..newline_pos);
417            self.position.column = 0;
418        } else {
419            self.buffer.clear();
420        }
421    }
422
423    fn is_sequence_item(&self) -> bool {
424        self.buffer.starts_with("- ")
425    }
426
427    fn is_mapping_key(&self) -> bool {
428        // Simplified check for mapping key
429        self.buffer.contains(':') && !self.buffer.starts_with(':')
430    }
431
432    fn find_mapping_key_end(&self) -> Option<usize> {
433        self.buffer.find(':')
434    }
435
436    fn parse_flow_sequence(&mut self) -> Result<()> {
437        // Simplified flow sequence parsing
438        if let Some(end) = self.buffer.find(']') {
439            let content = self.buffer.drain(..=end).collect::<String>();
440            self.emit_sequence_start()?;
441            // Parse content (simplified)
442            self.emit_sequence_end()?;
443            self.position.column += content.len();
444        }
445        Ok(())
446    }
447
448    fn parse_flow_mapping(&mut self) -> Result<()> {
449        // Simplified flow mapping parsing
450        if let Some(end) = self.buffer.find('}') {
451            let content = self.buffer.drain(..=end).collect::<String>();
452            self.emit_mapping_start()?;
453            // Parse content (simplified)
454            self.emit_mapping_end()?;
455            self.position.column += content.len();
456        }
457        Ok(())
458    }
459
460    fn parse_block_scalar_start(&mut self, _indicator: char) -> Result<()> {
461        self.buffer.remove(0); // Remove indicator
462        self.context.in_block_scalar = true;
463        // Parse block scalar header (simplified)
464        Ok(())
465    }
466
467    fn parse_block_scalar_content(&mut self) -> Result<()> {
468        // Simplified block scalar parsing
469        if let Some(end) = self.find_block_scalar_end() {
470            let content = self.buffer.drain(..end).collect::<String>();
471            self.emit_scalar(content)?;
472            self.context.in_block_scalar = false;
473        }
474        Ok(())
475    }
476
477    fn find_block_scalar_end(&self) -> Option<usize> {
478        // Simplified - find dedent or document marker
479        self.buffer.find("\n\n").or(self.buffer.find("\n..."))
480    }
481
482    fn parse_anchor(&mut self) -> Result<()> {
483        self.buffer.remove(0); // Remove '&'
484        let end = self.find_identifier_end();
485        if let Some(end) = end {
486            let anchor = self.buffer.drain(..end).collect::<String>();
487            self.context.pending_anchor = Some(anchor);
488            self.position.column += end + 1;
489        }
490        Ok(())
491    }
492
493    fn parse_alias(&mut self) -> Result<()> {
494        self.buffer.remove(0); // Remove '*'
495        let end = self.find_identifier_end();
496        if let Some(end) = end {
497            let alias = self.buffer.drain(..end).collect::<String>();
498            self.emit_alias(alias)?;
499            self.position.column += end + 1;
500        }
501        Ok(())
502    }
503
504    fn parse_quoted_scalar(&mut self, quote: char) -> Result<()> {
505        self.buffer.remove(0); // Remove opening quote
506        if let Some(end) = self.buffer.find(quote) {
507            let content = self.buffer.drain(..end).collect::<String>();
508            self.buffer.remove(0); // Remove closing quote
509            let content_len = content.len();
510            self.emit_scalar(content)?;
511            self.position.column += content_len + 2;
512        }
513        Ok(())
514    }
515
516    fn parse_plain_scalar(&mut self) -> Result<()> {
517        let end = self.find_plain_scalar_end();
518        if let Some(end) = end {
519            let content = self.buffer.drain(..end).collect::<String>();
520            self.emit_scalar(content.trim().to_string())?;
521            self.position.column += end;
522        }
523        Ok(())
524    }
525
526    fn find_identifier_end(&self) -> Option<usize> {
527        for (i, ch) in self.buffer.char_indices() {
528            if !ch.is_alphanumeric() && ch != '_' && ch != '-' {
529                return Some(i);
530            }
531        }
532        None
533    }
534
535    fn find_plain_scalar_end(&self) -> Option<usize> {
536        // Find end of plain scalar (simplified)
537        for (i, ch) in self.buffer.char_indices() {
538            if ch == '\n' || ch == ':' || ch == '#' {
539                return Some(i);
540            }
541        }
542        Some(self.buffer.len())
543    }
544
545    /// Event emission methods
546    fn emit_stream_start(&mut self) -> Result<()> {
547        self.events.push_back(Event {
548            event_type: EventType::StreamStart,
549            position: self.position,
550        });
551        self.stats.events_generated += 1;
552        Ok(())
553    }
554
555    fn emit_stream_end(&mut self) -> Result<()> {
556        self.events.push_back(Event {
557            event_type: EventType::StreamEnd,
558            position: self.position,
559        });
560        self.stats.events_generated += 1;
561        Ok(())
562    }
563
564    fn emit_document_start(&mut self) -> Result<()> {
565        self.events.push_back(Event {
566            event_type: EventType::DocumentStart {
567                version: None,
568                tags: Vec::new(),
569                implicit: false,
570            },
571            position: self.position,
572        });
573        self.stats.events_generated += 1;
574        self.stats.documents_parsed += 1;
575        Ok(())
576    }
577
578    fn emit_document_end(&mut self) -> Result<()> {
579        self.events.push_back(Event {
580            event_type: EventType::DocumentEnd { implicit: false },
581            position: self.position,
582        });
583        self.stats.events_generated += 1;
584        Ok(())
585    }
586
587    fn emit_sequence_start(&mut self) -> Result<()> {
588        let anchor = self.context.pending_anchor.take();
589        let tag = self.context.pending_tag.take();
590
591        self.events.push_back(Event {
592            event_type: EventType::SequenceStart {
593                anchor,
594                tag,
595                flow_style: false,
596            },
597            position: self.position,
598        });
599        self.stats.events_generated += 1;
600        Ok(())
601    }
602
603    fn emit_sequence_end(&mut self) -> Result<()> {
604        self.events.push_back(Event {
605            event_type: EventType::SequenceEnd,
606            position: self.position,
607        });
608        self.stats.events_generated += 1;
609        Ok(())
610    }
611
612    fn emit_mapping_start(&mut self) -> Result<()> {
613        let anchor = self.context.pending_anchor.take();
614        let tag = self.context.pending_tag.take();
615
616        self.events.push_back(Event {
617            event_type: EventType::MappingStart {
618                anchor,
619                tag,
620                flow_style: false,
621            },
622            position: self.position,
623        });
624        self.stats.events_generated += 1;
625        Ok(())
626    }
627
628    fn emit_mapping_end(&mut self) -> Result<()> {
629        self.events.push_back(Event {
630            event_type: EventType::MappingEnd,
631            position: self.position,
632        });
633        self.stats.events_generated += 1;
634        Ok(())
635    }
636
637    fn emit_scalar(&mut self, value: String) -> Result<()> {
638        let anchor = self.context.pending_anchor.take();
639        let tag = self.context.pending_tag.take();
640
641        self.events.push_back(Event {
642            event_type: EventType::Scalar {
643                value,
644                anchor,
645                tag,
646                style: crate::parser::ScalarStyle::Plain,
647                plain_implicit: true,
648                quoted_implicit: true,
649            },
650            position: self.position,
651        });
652        self.stats.events_generated += 1;
653        Ok(())
654    }
655
656    fn emit_alias(&mut self, anchor: String) -> Result<()> {
657        self.events.push_back(Event {
658            event_type: EventType::Alias { anchor },
659            position: self.position,
660        });
661        self.stats.events_generated += 1;
662        Ok(())
663    }
664
665    /// Get the next event if available
666    pub fn next_event(&mut self) -> Option<Event> {
667        self.events.pop_front()
668    }
669
670    /// Check if there are events available
671    pub fn has_events(&self) -> bool {
672        !self.events.is_empty()
673    }
674
675    /// Get current statistics
676    pub fn stats(&self) -> &StreamStats {
677        &self.stats
678    }
679
680    /// Get remaining buffer size
681    pub fn buffer_size(&self) -> usize {
682        self.buffer.len()
683    }
684}
685
686/// Iterator implementation for streaming parser
687impl<R: BufRead> Iterator for StreamingYamlParser<R> {
688    type Item = Result<Event>;
689
690    fn next(&mut self) -> Option<Self::Item> {
691        // Try to get an event from the buffer
692        if let Some(event) = self.next_event() {
693            return Some(Ok(event));
694        }
695
696        // Parse more data if needed
697        match self.parse_next() {
698            Ok(true) => self.next_event().map(Ok),
699            Ok(false) if self.state == StreamState::EndOfStream => {
700                if !self.events.is_empty() {
701                    self.next_event().map(Ok)
702                } else {
703                    None
704                }
705            }
706            Ok(false) => None,
707            Err(e) => Some(Err(e)),
708        }
709    }
710}
711
712/// Create a streaming parser from a file path
713pub fn stream_from_file<P: AsRef<Path>>(
714    path: P,
715    config: StreamConfig,
716) -> Result<StreamingYamlParser<BufReader<std::fs::File>>> {
717    let file = std::fs::File::open(path)?;
718    let reader = BufReader::with_capacity(config.buffer_size, file);
719    Ok(StreamingYamlParser::new(reader, config))
720}
721
722/// Create a streaming parser from a string
723pub fn stream_from_string(
724    input: String,
725    config: StreamConfig,
726) -> StreamingYamlParser<BufReader<std::io::Cursor<String>>> {
727    let cursor = std::io::Cursor::new(input);
728    let reader = BufReader::new(cursor);
729    StreamingYamlParser::new(reader, config)
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735    use std::io::Cursor;
736
737    #[test]
738    fn test_basic_streaming() {
739        let yaml = "---\nkey: value\n...\n---\nother: data\n...";
740        let cursor = Cursor::new(yaml.to_string());
741        let reader = BufReader::new(cursor);
742        let mut parser = StreamingYamlParser::new(reader, StreamConfig::default());
743
744        let mut events = Vec::new();
745        while let Some(event) = parser.next() {
746            events.push(event.unwrap());
747        }
748
749        assert!(events.len() > 0);
750        assert!(matches!(events[0].event_type, EventType::StreamStart));
751    }
752
753    #[test]
754    fn test_incremental_parsing() {
755        let yaml = "key: value\nlist:\n  - item1\n  - item2";
756        let mut parser = stream_from_string(yaml.to_string(), StreamConfig::default());
757
758        // Parse incrementally
759        let mut event_count = 0;
760        while parser.parse_next().unwrap() {
761            while let Some(_event) = parser.next_event() {
762                event_count += 1;
763            }
764        }
765
766        assert!(event_count > 0);
767    }
768
769    #[test]
770    fn test_large_buffer_handling() {
771        let mut yaml = String::new();
772        for i in 0..1000 {
773            yaml.push_str(&format!("item{}: value{}\n", i, i));
774        }
775
776        let config = StreamConfig::large_file();
777        let mut parser = stream_from_string(yaml, config);
778
779        let mut events = Vec::new();
780        for event in parser.take(100) {
781            events.push(event.unwrap());
782        }
783
784        assert!(events.len() > 0);
785    }
786}