rust_yaml/parser/
streaming.rs

1//! Streaming parser implementation for efficient YAML processing
2//!
3//! This module provides a streaming parser that processes YAML incrementally,
4//! reducing memory usage and improving performance for large documents.
5
6use crate::{
7    parser::{Event, Parser, ParserState},
8    zerocopy::ScannerStats,
9    BasicScanner, Error, Position, Result, Scanner, Token, TokenType, ZeroScanner, ZeroToken,
10    ZeroTokenType,
11};
12use std::collections::VecDeque;
13
14/// Configuration for streaming parser behavior
15#[derive(Debug, Clone)]
16pub struct StreamingConfig {
17    /// Maximum number of events to buffer
18    pub max_buffer_size: usize,
19    /// Enable zero-copy optimizations where possible
20    pub use_zero_copy: bool,
21    /// Maximum depth for nested structures
22    pub max_depth: usize,
23    /// Enable streaming statistics collection
24    pub collect_stats: bool,
25}
26
27impl Default for StreamingConfig {
28    fn default() -> Self {
29        Self {
30            max_buffer_size: 64,
31            use_zero_copy: true,
32            max_depth: 256,
33            collect_stats: false,
34        }
35    }
36}
37
38/// Statistics about streaming parser performance
39#[derive(Debug, Clone)]
40pub struct StreamingStats {
41    /// Number of events processed
42    pub events_processed: usize,
43    /// Number of tokens processed
44    pub tokens_processed: usize,
45    /// Maximum buffer size reached
46    pub max_buffer_size: usize,
47    /// Maximum nesting depth reached
48    pub max_depth: usize,
49    /// Zero-copy scanner statistics (if enabled)
50    pub scanner_stats: Option<ScannerStats>,
51    /// Total time spent parsing (in nanoseconds)
52    pub parse_time_ns: u64,
53}
54
55/// Streaming YAML parser that processes events on demand
56pub struct StreamingParser<'a> {
57    /// Traditional scanner for compatibility
58    scanner: Option<BasicScanner>,
59    /// Zero-copy scanner for optimized parsing
60    zero_scanner: Option<ZeroScanner<'a>>,
61    /// Configuration
62    config: StreamingConfig,
63    /// Event buffer for batched processing
64    event_buffer: VecDeque<Event>,
65    /// Current parser state
66    state: ParserState,
67    /// State stack for nested structures
68    state_stack: Vec<ParserState>,
69    /// Current position
70    position: Position,
71    /// Current nesting depth
72    depth: usize,
73    /// Pending anchor for next node
74    pending_anchor: Option<String>,
75    /// Pending tag for next node
76    pending_tag: Option<String>,
77    /// Statistics (if enabled)
78    stats: Option<StreamingStats>,
79    /// Start time for performance measurement
80    start_time: std::time::Instant,
81    /// Whether the stream has ended
82    stream_ended: bool,
83}
84
85impl<'a> StreamingParser<'a> {
86    /// Create a new streaming parser with traditional scanner
87    pub fn new(input: String, config: StreamingConfig) -> StreamingParser<'static> {
88        let scanner = BasicScanner::new(input);
89        let position = scanner.position();
90
91        StreamingParser {
92            scanner: Some(scanner),
93            zero_scanner: None,
94            config: config.clone(),
95            event_buffer: VecDeque::with_capacity(config.max_buffer_size),
96            state: ParserState::StreamStart,
97            state_stack: Vec::with_capacity(config.max_depth),
98            position,
99            depth: 0,
100            pending_anchor: None,
101            pending_tag: None,
102            stats: if config.collect_stats {
103                Some(StreamingStats {
104                    events_processed: 0,
105                    tokens_processed: 0,
106                    max_buffer_size: 0,
107                    max_depth: 0,
108                    scanner_stats: None,
109                    parse_time_ns: 0,
110                })
111            } else {
112                None
113            },
114            start_time: std::time::Instant::now(),
115            stream_ended: false,
116        }
117    }
118
119    /// Create a new streaming parser with zero-copy scanner
120    pub fn new_zero_copy(input: &'a str, config: StreamingConfig) -> Self {
121        let zero_scanner = ZeroScanner::new(input);
122        let position = zero_scanner.position;
123
124        Self {
125            scanner: None,
126            zero_scanner: Some(zero_scanner),
127            config: config.clone(),
128            event_buffer: VecDeque::with_capacity(config.max_buffer_size),
129            state: ParserState::StreamStart,
130            state_stack: Vec::with_capacity(config.max_depth),
131            position,
132            depth: 0,
133            pending_anchor: None,
134            pending_tag: None,
135            stats: if config.collect_stats {
136                Some(StreamingStats {
137                    events_processed: 0,
138                    tokens_processed: 0,
139                    max_buffer_size: 0,
140                    max_depth: 0,
141                    scanner_stats: None,
142                    parse_time_ns: 0,
143                })
144            } else {
145                None
146            },
147            start_time: std::time::Instant::now(),
148            stream_ended: false,
149        }
150    }
151
152    /// Get the next batch of events
153    pub fn next_batch(&mut self) -> Result<Vec<Event>> {
154        if self.stream_ended {
155            return Ok(Vec::new());
156        }
157
158        let mut events = Vec::new();
159        let target_size = std::cmp::min(self.config.max_buffer_size / 2, 8);
160
161        while events.len() < target_size && !self.stream_ended {
162            if let Some(event) = self.next_event_internal()? {
163                events.push(event);
164            } else {
165                break;
166            }
167        }
168
169        Ok(events)
170    }
171
172    /// Get the next event from buffer or generate a new one
173    fn next_event_internal(&mut self) -> Result<Option<Event>> {
174        // Return buffered event if available
175        if let Some(event) = self.event_buffer.pop_front() {
176            self.update_stats_for_event(&event);
177            return Ok(Some(event));
178        }
179
180        // Generate new event(s)
181        self.generate_events()?;
182
183        // Return the first buffered event
184        if let Some(event) = self.event_buffer.pop_front() {
185            self.update_stats_for_event(&event);
186            Ok(Some(event))
187        } else {
188            Ok(None)
189        }
190    }
191
192    /// Generate events by processing tokens
193    fn generate_events(&mut self) -> Result<()> {
194        if self.stream_ended {
195            return Ok(());
196        }
197
198        // Check depth limit
199        if self.depth > self.config.max_depth {
200            return Err(Error::parse(
201                self.position,
202                format!("Maximum nesting depth exceeded: {}", self.config.max_depth),
203            ));
204        }
205
206        if self.config.use_zero_copy && self.zero_scanner.is_some() {
207            self.generate_events_zero_copy()
208        } else {
209            self.generate_events_traditional()
210        }
211    }
212
213    /// Generate events using zero-copy scanner
214    fn generate_events_zero_copy(&mut self) -> Result<()> {
215        // Process a small batch of characters/tokens
216        let batch_size = 16;
217        let mut processed = 0;
218
219        while processed < batch_size {
220            // Get current character without borrowing the entire scanner
221            let current_char = if let Some(scanner) = &self.zero_scanner {
222                scanner.current_char()
223            } else {
224                None
225            };
226
227            if current_char.is_none() {
228                // End of stream
229                if !matches!(self.state, ParserState::StreamEnd) {
230                    self.event_buffer
231                        .push_back(Event::stream_end(self.position));
232                    self.stream_ended = true;
233                }
234                break;
235            }
236
237            // Skip whitespace efficiently
238            if let Some(scanner) = &mut self.zero_scanner {
239                scanner.skip_whitespace();
240            }
241
242            let ch = current_char.unwrap();
243            match ch {
244                '-' if self.is_document_start_candidate_simple() => {
245                    self.handle_document_start();
246                    // Advance past the "---"
247                    if let Some(scanner) = &mut self.zero_scanner {
248                        scanner.advance();
249                        scanner.advance();
250                        scanner.advance();
251                    }
252                }
253                '.' if self.is_document_end_candidate_simple() => {
254                    self.handle_document_end();
255                    // Advance past the "..."
256                    if let Some(scanner) = &mut self.zero_scanner {
257                        scanner.advance();
258                        scanner.advance();
259                        scanner.advance();
260                    }
261                }
262                '[' => {
263                    self.handle_flow_sequence_start();
264                    if let Some(scanner) = &mut self.zero_scanner {
265                        scanner.advance();
266                    }
267                }
268                ']' => {
269                    self.handle_flow_sequence_end();
270                    if let Some(scanner) = &mut self.zero_scanner {
271                        scanner.advance();
272                    }
273                }
274                '{' => {
275                    self.handle_flow_mapping_start();
276                    if let Some(scanner) = &mut self.zero_scanner {
277                        scanner.advance();
278                    }
279                }
280                '}' => {
281                    self.handle_flow_mapping_end();
282                    if let Some(scanner) = &mut self.zero_scanner {
283                        scanner.advance();
284                    }
285                }
286                ':' if self.is_value_indicator_simple() => {
287                    self.handle_value_indicator();
288                    if let Some(scanner) = &mut self.zero_scanner {
289                        scanner.advance();
290                    }
291                }
292                ',' => {
293                    // Flow entry separator
294                    if let Some(scanner) = &mut self.zero_scanner {
295                        scanner.advance();
296                    }
297                }
298                '#' => {
299                    // Skip comments for now
300                    self.skip_comment_simple();
301                }
302                ch if ch.is_alphabetic() || ch.is_numeric() => {
303                    // Scan scalar using zero-copy
304                    let scalar_token = if let Some(scanner) = &mut self.zero_scanner {
305                        scanner.scan_plain_scalar_zero_copy()?
306                    } else {
307                        return Err(Error::parse(
308                            self.position,
309                            "No scanner available".to_string(),
310                        ));
311                    };
312                    self.handle_zero_copy_scalar(scalar_token)?;
313                }
314                '&' => {
315                    // Anchor
316                    if let Some(scanner) = &mut self.zero_scanner {
317                        scanner.advance(); // Skip '&'
318                        let anchor = scanner.scan_identifier_zero_copy()?;
319                        self.pending_anchor = Some(anchor.as_str().to_string());
320                    }
321                }
322                '*' => {
323                    // Alias
324                    if let Some(scanner) = &mut self.zero_scanner {
325                        scanner.advance(); // Skip '*'
326                        let alias = scanner.scan_identifier_zero_copy()?;
327                        self.event_buffer
328                            .push_back(Event::alias(self.position, alias.as_str().to_string()));
329                    }
330                }
331                _ => {
332                    // Unknown character, skip it
333                    if let Some(scanner) = &mut self.zero_scanner {
334                        scanner.advance();
335                    }
336                }
337            }
338
339            processed += 1;
340            if let Some(scanner) = &self.zero_scanner {
341                self.position = scanner.position;
342            }
343
344            if let Some(ref mut stats) = self.stats {
345                stats.tokens_processed += 1;
346            }
347        }
348
349        Ok(())
350    }
351
352    /// Generate events using traditional scanner
353    fn generate_events_traditional(&mut self) -> Result<()> {
354        // Process a few tokens at a time
355        for _ in 0..4 {
356            let has_token = if let Some(scanner) = &self.scanner {
357                scanner.check_token()
358            } else {
359                false
360            };
361
362            if !has_token {
363                if !matches!(self.state, ParserState::StreamEnd) {
364                    self.event_buffer
365                        .push_back(Event::stream_end(self.position));
366                    self.stream_ended = true;
367                }
368                break;
369            }
370
371            let token = if let Some(scanner) = &mut self.scanner {
372                scanner.get_token()?
373            } else {
374                None
375            };
376
377            if let Some(token) = token {
378                self.process_token(token)?;
379
380                if let Some(ref mut stats) = self.stats {
381                    stats.tokens_processed += 1;
382                }
383            }
384        }
385
386        Ok(())
387    }
388
389    /// Check if this position could be document start (---) - simplified version
390    fn is_document_start_candidate_simple(&self) -> bool {
391        if let Some(scanner) = &self.zero_scanner {
392            scanner.current_char() == Some('-')
393                && scanner.peek_char(1) == Some('-')
394                && scanner.peek_char(2) == Some('-')
395                && scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
396        } else {
397            false
398        }
399    }
400
401    /// Check if this position could be document end (...) - simplified version
402    fn is_document_end_candidate_simple(&self) -> bool {
403        if let Some(scanner) = &self.zero_scanner {
404            scanner.current_char() == Some('.')
405                && scanner.peek_char(1) == Some('.')
406                && scanner.peek_char(2) == Some('.')
407                && scanner.peek_char(3).map_or(true, |c| c.is_whitespace())
408        } else {
409            false
410        }
411    }
412
413    /// Check if colon is a value indicator - simplified version
414    fn is_value_indicator_simple(&self) -> bool {
415        if let Some(scanner) = &self.zero_scanner {
416            scanner.current_char() == Some(':')
417                && scanner.peek_char(1).map_or(true, |c| c.is_whitespace())
418        } else {
419            false
420        }
421    }
422
423    /// Handle document start
424    fn handle_document_start(&mut self) {
425        self.event_buffer
426            .push_back(Event::document_start(self.position, None, vec![], false));
427        self.state = ParserState::DocumentStart;
428    }
429
430    /// Handle document end
431    fn handle_document_end(&mut self) {
432        self.event_buffer
433            .push_back(Event::document_end(self.position, false));
434        self.state = ParserState::DocumentEnd;
435    }
436
437    /// Handle flow sequence start
438    fn handle_flow_sequence_start(&mut self) {
439        self.push_state(ParserState::FlowSequence);
440        self.event_buffer.push_back(Event::sequence_start(
441            self.position,
442            self.pending_anchor.take(),
443            self.pending_tag.take(),
444            true,
445        ));
446    }
447
448    /// Handle flow sequence end
449    fn handle_flow_sequence_end(&mut self) {
450        self.event_buffer
451            .push_back(Event::sequence_end(self.position));
452        self.pop_state();
453    }
454
455    /// Handle flow mapping start
456    fn handle_flow_mapping_start(&mut self) {
457        self.push_state(ParserState::FlowMapping);
458        self.event_buffer.push_back(Event::mapping_start(
459            self.position,
460            self.pending_anchor.take(),
461            self.pending_tag.take(),
462            true,
463        ));
464    }
465
466    /// Handle flow mapping end
467    fn handle_flow_mapping_end(&mut self) {
468        self.event_buffer
469            .push_back(Event::mapping_end(self.position));
470        self.pop_state();
471    }
472
473    /// Handle value indicator (:)
474    fn handle_value_indicator(&mut self) {
475        match self.state {
476            ParserState::BlockMappingKey => {
477                self.state = ParserState::BlockMappingValue;
478            }
479            ParserState::FlowMapping => {
480                // In flow mapping, we don't change state on value indicator
481            }
482            _ => {
483                // Might need to start a new mapping
484                // This is simplified - full implementation would be more complex
485            }
486        }
487    }
488
489    /// Handle zero-copy scalar token
490    fn handle_zero_copy_scalar(&mut self, token: ZeroToken) -> Result<()> {
491        if let ZeroTokenType::Scalar(zero_string, quote_style) = token.token_type {
492            // Convert to regular scalar event
493            let style = match quote_style {
494                crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
495                crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
496                crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
497            };
498
499            // Avoid allocation if possible
500            let value = if zero_string.is_borrowed() {
501                zero_string.as_str().to_string()
502            } else {
503                zero_string.into_owned()
504            };
505
506            self.event_buffer.push_back(Event::scalar(
507                token.start_position,
508                self.pending_anchor.take(),
509                self.pending_tag.take(),
510                value,
511                style == crate::parser::ScalarStyle::Plain,
512                style != crate::parser::ScalarStyle::Plain,
513                style,
514            ));
515        }
516        Ok(())
517    }
518
519    /// Skip comment line - simplified version
520    fn skip_comment_simple(&mut self) {
521        if let Some(scanner) = &mut self.zero_scanner {
522            while let Some(ch) = scanner.current_char() {
523                scanner.advance();
524                if ch == '\n' || ch == '\r' {
525                    break;
526                }
527            }
528        }
529    }
530
531    /// Process a token from traditional scanner
532    fn process_token(&mut self, token: Token) -> Result<()> {
533        self.position = token.end_position;
534
535        match token.token_type {
536            TokenType::StreamStart => {
537                self.event_buffer
538                    .push_back(Event::stream_start(token.start_position));
539                self.state = ParserState::ImplicitDocumentStart;
540            }
541            TokenType::StreamEnd => {
542                self.event_buffer
543                    .push_back(Event::stream_end(token.start_position));
544                self.stream_ended = true;
545            }
546            TokenType::Scalar(value, quote_style) => {
547                let style = match quote_style {
548                    crate::scanner::QuoteStyle::Plain => crate::parser::ScalarStyle::Plain,
549                    crate::scanner::QuoteStyle::Single => crate::parser::ScalarStyle::SingleQuoted,
550                    crate::scanner::QuoteStyle::Double => crate::parser::ScalarStyle::DoubleQuoted,
551                };
552
553                self.event_buffer.push_back(Event::scalar(
554                    token.start_position,
555                    self.pending_anchor.take(),
556                    self.pending_tag.take(),
557                    value,
558                    style == crate::parser::ScalarStyle::Plain,
559                    style != crate::parser::ScalarStyle::Plain,
560                    style,
561                ));
562            }
563            // Add other token types as needed
564            _ => {
565                // Simplified implementation - full parser would handle all token types
566            }
567        }
568
569        Ok(())
570    }
571
572    /// Push a new state onto the stack
573    fn push_state(&mut self, new_state: ParserState) {
574        self.state_stack.push(self.state);
575        self.state = new_state;
576        self.depth += 1;
577
578        if let Some(ref mut stats) = self.stats {
579            stats.max_depth = stats.max_depth.max(self.depth);
580        }
581    }
582
583    /// Pop state from the stack
584    fn pop_state(&mut self) {
585        if let Some(prev_state) = self.state_stack.pop() {
586            self.state = prev_state;
587            self.depth = self.depth.saturating_sub(1);
588        }
589    }
590
591    /// Update statistics for processed event
592    fn update_stats_for_event(&mut self, _event: &Event) {
593        if let Some(ref mut stats) = self.stats {
594            stats.events_processed += 1;
595            stats.max_buffer_size = stats.max_buffer_size.max(self.event_buffer.len());
596        }
597    }
598
599    /// Get current parsing statistics
600    pub fn get_stats(&mut self) -> Option<StreamingStats> {
601        if let Some(ref mut stats) = self.stats {
602            stats.parse_time_ns = self.start_time.elapsed().as_nanos() as u64;
603
604            if let Some(ref scanner) = self.zero_scanner {
605                stats.scanner_stats = Some(scanner.stats());
606            }
607
608            Some(stats.clone())
609        } else {
610            None
611        }
612    }
613
614    /// Check if more events are available
615    pub fn has_more_events(&self) -> bool {
616        !self.stream_ended || !self.event_buffer.is_empty()
617    }
618
619    /// Get the current buffer size
620    pub fn buffer_size(&self) -> usize {
621        self.event_buffer.len()
622    }
623}
624
625impl<'a> Parser for StreamingParser<'a> {
626    fn check_event(&self) -> bool {
627        !self.event_buffer.is_empty() || !self.stream_ended
628    }
629
630    fn peek_event(&self) -> Result<Option<&Event>> {
631        Ok(self.event_buffer.front())
632    }
633
634    fn get_event(&mut self) -> Result<Option<Event>> {
635        self.next_event_internal()
636    }
637
638    fn reset(&mut self) {
639        self.event_buffer.clear();
640        self.state = ParserState::StreamStart;
641        self.state_stack.clear();
642        self.depth = 0;
643        self.pending_anchor = None;
644        self.pending_tag = None;
645        self.stream_ended = false;
646        self.start_time = std::time::Instant::now();
647
648        if let Some(ref mut scanner) = self.scanner {
649            scanner.reset();
650        }
651        if let Some(ref mut scanner) = self.zero_scanner {
652            scanner.reset();
653        }
654    }
655
656    fn position(&self) -> Position {
657        self.position
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664    use crate::EventType;
665
666    #[test]
667    fn test_streaming_parser_basic() {
668        // Use zero-copy parser for this test since traditional implementation is simplified
669        let input = "42";
670        let config = StreamingConfig {
671            use_zero_copy: true,
672            collect_stats: true,
673            ..Default::default()
674        };
675        let mut parser = StreamingParser::new_zero_copy(input, config);
676
677        // Should be able to get events
678        assert!(parser.check_event());
679
680        // Get multiple batches to ensure we process the scalar
681        let mut all_events = Vec::new();
682        for _ in 0..5 {
683            let batch = parser.next_batch().unwrap();
684            if batch.is_empty() {
685                break;
686            }
687            all_events.extend(batch);
688        }
689
690        assert!(!all_events.is_empty(), "Should generate at least one event");
691
692        // Should contain a scalar event
693        let has_scalar = all_events.iter().any(|e| {
694            if let EventType::Scalar { value, .. } = &e.event_type {
695                value == "42"
696            } else {
697                false
698            }
699        });
700        assert!(has_scalar, "Should find scalar event with value '42'");
701    }
702
703    #[test]
704    fn test_zero_copy_streaming() {
705        let input = "key: value";
706        let config = StreamingConfig {
707            use_zero_copy: true,
708            collect_stats: true,
709            ..Default::default()
710        };
711
712        let mut parser = StreamingParser::new_zero_copy(input, config);
713
714        // Process some events
715        let batch = parser.next_batch().unwrap();
716        assert!(!batch.is_empty());
717
718        // Should have some statistics
719        let stats = parser.get_stats();
720        assert!(stats.is_some());
721
722        let stats = stats.unwrap();
723        assert!(stats.events_processed > 0);
724    }
725
726    #[test]
727    fn test_streaming_config() {
728        let config = StreamingConfig {
729            max_buffer_size: 32,
730            use_zero_copy: false,
731            max_depth: 10,
732            collect_stats: true,
733        };
734
735        let parser = StreamingParser::new("test".to_string(), config);
736        assert_eq!(parser.config.max_buffer_size, 32);
737        assert!(!parser.config.use_zero_copy);
738        assert_eq!(parser.config.max_depth, 10);
739        assert!(parser.config.collect_stats);
740    }
741
742    #[test]
743    fn test_flow_collections_streaming() {
744        let input = "[1, 2, 3]";
745        let config = StreamingConfig::default();
746
747        let mut parser = StreamingParser::new_zero_copy(input, config);
748
749        let mut all_events = Vec::new();
750        while parser.has_more_events() {
751            let batch = parser.next_batch().unwrap();
752            if batch.is_empty() {
753                break;
754            }
755            all_events.extend(batch);
756        }
757
758        // Should have sequence start and some scalars
759        let has_sequence_start = all_events.iter().any(|e| {
760            matches!(
761                e.event_type,
762                EventType::SequenceStart {
763                    flow_style: true,
764                    ..
765                }
766            )
767        });
768        assert!(has_sequence_start, "Should find flow sequence start");
769    }
770}