rust_yaml/
streaming_async.rs

1//! Async streaming YAML parser for non-blocking I/O
2//!
3//! This module provides async/await support for streaming YAML parsing,
4//! enabling efficient processing of YAML from async sources like network
5//! streams, async file I/O, and more.
6
7#[cfg(feature = "async")]
8use futures::stream::Stream;
9#[cfg(feature = "async")]
10use std::pin::Pin;
11#[cfg(feature = "async")]
12use std::task::{Context, Poll};
13#[cfg(feature = "async")]
14use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
15
16use crate::{
17    parser::{Event, EventType},
18    Limits, Position, Result,
19};
20use std::collections::VecDeque;
21
22/// Async streaming YAML parser
23#[cfg(feature = "async")]
24pub struct AsyncStreamingParser<R: AsyncBufRead + Unpin> {
25    /// Async reader
26    reader: R,
27    /// Buffer for incomplete data
28    buffer: String,
29    /// Event queue
30    events: VecDeque<Event>,
31    /// Current position
32    position: Position,
33    /// Parse state
34    state: AsyncParseState,
35    /// Resource limits
36    limits: Limits,
37    /// Statistics
38    stats: AsyncStreamStats,
39}
40
41#[cfg(feature = "async")]
42#[derive(Debug, Clone, PartialEq)]
43enum AsyncParseState {
44    Initial,
45    InDocument,
46    BetweenDocuments,
47    Complete,
48}
49
50#[cfg(feature = "async")]
51#[derive(Debug, Clone, Default)]
52/// Statistics for async streaming parser
53#[allow(missing_docs)]
54pub struct AsyncStreamStats {
55    pub bytes_read: usize,
56    pub events_generated: usize,
57    pub documents_parsed: usize,
58}
59
60#[cfg(feature = "async")]
61impl<R: AsyncBufRead + Unpin> AsyncStreamingParser<R> {
62    /// Create a new async streaming parser
63    pub fn new(reader: R, limits: Limits) -> Self {
64        Self {
65            reader,
66            buffer: String::with_capacity(4096),
67            events: VecDeque::with_capacity(100),
68            position: Position::new(),
69            state: AsyncParseState::Initial,
70            limits,
71            stats: AsyncStreamStats::default(),
72        }
73    }
74
75    /// Parse the next chunk asynchronously
76    pub async fn parse_next(&mut self) -> Result<bool> {
77        // Read next line or chunk
78        let mut line = String::new();
79        let bytes_read = self.reader.read_line(&mut line).await?;
80
81        if bytes_read == 0 && self.buffer.is_empty() {
82            self.state = AsyncParseState::Complete;
83            return Ok(false);
84        }
85
86        self.buffer.push_str(&line);
87        self.stats.bytes_read += bytes_read;
88
89        // Parse the buffer
90        self.parse_buffer()?;
91
92        Ok(!self.events.is_empty())
93    }
94
95    /// Parse current buffer content
96    fn parse_buffer(&mut self) -> Result<()> {
97        match self.state {
98            AsyncParseState::Initial => {
99                self.emit_event(EventType::StreamStart)?;
100                self.state = AsyncParseState::BetweenDocuments;
101            }
102            AsyncParseState::BetweenDocuments => {
103                if self.buffer.contains("---") {
104                    self.emit_event(EventType::DocumentStart {
105                        version: None,
106                        tags: Vec::new(),
107                        implicit: true,
108                    })?;
109                    self.state = AsyncParseState::InDocument;
110                    self.stats.documents_parsed += 1;
111                }
112            }
113            AsyncParseState::InDocument => {
114                self.parse_document_content()?;
115            }
116            AsyncParseState::Complete => {}
117        }
118        Ok(())
119    }
120
121    /// Parse document content
122    fn parse_document_content(&mut self) -> Result<()> {
123        // Simplified parsing logic
124        while !self.buffer.is_empty() {
125            if self.buffer.starts_with("...") {
126                self.emit_event(EventType::DocumentEnd { implicit: false })?;
127                self.state = AsyncParseState::BetweenDocuments;
128                self.buffer.drain(..3);
129                break;
130            }
131
132            // Parse line by line (simplified)
133            if let Some(newline_pos) = self.buffer.find('\n') {
134                let line = self.buffer.drain(..=newline_pos).collect::<String>();
135                self.parse_line(line)?;
136            } else {
137                break; // Need more data
138            }
139        }
140        Ok(())
141    }
142
143    /// Parse a single line
144    fn parse_line(&mut self, line: String) -> Result<()> {
145        let trimmed = line.trim();
146
147        if trimmed.is_empty() || trimmed.starts_with('#') {
148            return Ok(());
149        }
150
151        // Simple key-value parsing
152        if let Some(colon_pos) = trimmed.find(':') {
153            let key = &trimmed[..colon_pos];
154            let value = &trimmed[colon_pos + 1..];
155
156            // Emit scalar events for key and value
157            self.emit_event(EventType::Scalar {
158                value: key.trim().to_string(),
159                anchor: None,
160                tag: None,
161                style: crate::parser::ScalarStyle::Plain,
162                plain_implicit: true,
163                quoted_implicit: true,
164            })?;
165
166            self.emit_event(EventType::Scalar {
167                value: value.trim().to_string(),
168                anchor: None,
169                tag: None,
170                style: crate::parser::ScalarStyle::Plain,
171                plain_implicit: true,
172                quoted_implicit: true,
173            })?;
174        }
175
176        Ok(())
177    }
178
179    /// Emit an event
180    fn emit_event(&mut self, event_type: EventType) -> Result<()> {
181        self.events.push_back(Event {
182            event_type,
183            position: self.position,
184        });
185        self.stats.events_generated += 1;
186        Ok(())
187    }
188
189    /// Get the next event
190    pub fn next_event(&mut self) -> Option<Event> {
191        self.events.pop_front()
192    }
193
194    /// Check if parsing is complete
195    pub fn is_complete(&self) -> bool {
196        self.state == AsyncParseState::Complete && self.events.is_empty()
197    }
198
199    /// Get statistics
200    pub fn stats(&self) -> &AsyncStreamStats {
201        &self.stats
202    }
203}
204
205#[cfg(feature = "async")]
206impl<R: AsyncBufRead + Unpin> Stream for AsyncStreamingParser<R> {
207    type Item = Result<Event>;
208
209    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210        // Check if we have buffered events
211        if let Some(event) = self.next_event() {
212            return Poll::Ready(Some(Ok(event)));
213        }
214
215        // If parsing is complete, return None
216        if self.is_complete() {
217            return Poll::Ready(None);
218        }
219
220        // Try to parse more data
221        let waker = cx.waker().clone();
222
223        // This is simplified - in production would use proper async runtime integration
224        match futures::executor::block_on(self.parse_next()) {
225            Ok(true) => {
226                if let Some(event) = self.next_event() {
227                    Poll::Ready(Some(Ok(event)))
228                } else {
229                    waker.wake();
230                    Poll::Pending
231                }
232            }
233            Ok(false) => Poll::Ready(None),
234            Err(e) => Poll::Ready(Some(Err(e))),
235        }
236    }
237}
238
239/// Async helper functions
240#[cfg(feature = "async")]
241pub mod helpers {
242    use super::*;
243    use std::path::Path;
244    use tokio::fs::File;
245
246    /// Stream YAML from an async file
247    pub async fn stream_from_file_async<P: AsRef<Path>>(
248        path: P,
249        limits: Limits,
250    ) -> Result<AsyncStreamingParser<BufReader<File>>> {
251        let file = File::open(path).await?;
252        let reader = BufReader::new(file);
253        Ok(AsyncStreamingParser::new(reader, limits))
254    }
255
256    /// Stream YAML from async reader
257    pub fn stream_from_async_reader<R: AsyncBufRead + Unpin>(
258        reader: R,
259        limits: Limits,
260    ) -> AsyncStreamingParser<R> {
261        AsyncStreamingParser::new(reader, limits)
262    }
263
264    /// Process YAML stream with a callback
265    pub async fn process_yaml_stream<R, F, Fut>(
266        mut parser: AsyncStreamingParser<R>,
267        mut callback: F,
268    ) -> Result<()>
269    where
270        R: AsyncBufRead + Unpin,
271        F: FnMut(Event) -> Fut,
272        Fut: std::future::Future<Output = Result<()>>,
273    {
274        while !parser.is_complete() {
275            if parser.parse_next().await? {
276                while let Some(event) = parser.next_event() {
277                    callback(event).await?;
278                }
279            }
280        }
281        Ok(())
282    }
283}
284
285/// Memory-mapped file support for efficient large file processing
286#[cfg(not(target_arch = "wasm32"))]
287pub mod mmap {
288    use crate::Result;
289    use memmap2::{Mmap, MmapOptions};
290    use std::fs::File;
291    use std::path::Path;
292
293    /// Memory-mapped YAML file reader
294    pub struct MmapYamlReader {
295        mmap: Mmap,
296        position: usize,
297    }
298
299    impl MmapYamlReader {
300        /// Create a new memory-mapped reader
301        pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
302            let file = File::open(path)?;
303            // Note: Using memory mapping which is inherently unsafe but contained
304            // This is acceptable for file I/O in controlled environments
305            #[allow(unsafe_code)]
306            let mmap = unsafe { MmapOptions::new().map(&file)? };
307
308            Ok(Self { mmap, position: 0 })
309        }
310
311        /// Get the entire content as a string slice
312        pub fn as_str(&self) -> Result<&str> {
313            std::str::from_utf8(&self.mmap).map_err(|e| {
314                crate::Error::construction(
315                    crate::Position::new(),
316                    format!("UTF-8 conversion failed: {}", e),
317                )
318            })
319        }
320
321        /// Read a chunk from current position
322        pub fn read_chunk(&mut self, size: usize) -> Option<&str> {
323            if self.position >= self.mmap.len() {
324                return None;
325            }
326
327            let end = (self.position + size).min(self.mmap.len());
328            let chunk = &self.mmap[self.position..end];
329            self.position = end;
330
331            std::str::from_utf8(chunk).ok()
332        }
333
334        /// Reset position to beginning
335        pub fn reset(&mut self) {
336            self.position = 0;
337        }
338
339        /// Get remaining bytes
340        pub fn remaining(&self) -> usize {
341            self.mmap.len().saturating_sub(self.position)
342        }
343    }
344}
345
346#[cfg(all(test, feature = "async"))]
347mod async_tests {
348    use super::*;
349    use futures::StreamExt;
350    use std::io::Cursor;
351
352    #[tokio::test]
353    async fn test_async_streaming() {
354        const MAX_ITERATIONS: usize = 100;
355
356        let yaml = "---\nkey: value\n...\n";
357        let cursor = Cursor::new(yaml.as_bytes().to_vec());
358        let reader = BufReader::new(cursor);
359        let mut parser = AsyncStreamingParser::new(reader, Limits::default());
360
361        let mut events = Vec::new();
362        let mut iterations = 0;
363
364        while !parser.is_complete() && iterations < MAX_ITERATIONS {
365            iterations += 1;
366            match parser.parse_next().await {
367                Ok(has_events) => {
368                    if has_events {
369                        while let Some(event) = parser.next_event() {
370                            events.push(event);
371                        }
372                    } else if parser.state == AsyncParseState::Complete {
373                        // Ensure we exit when parsing is done
374                        break;
375                    }
376                }
377                Err(_) => break,
378            }
379        }
380
381        assert!(!events.is_empty());
382        assert!(matches!(events[0].event_type, EventType::StreamStart));
383    }
384
385    #[tokio::test]
386    async fn test_stream_trait() {
387        use tokio::time::{timeout, Duration};
388
389        let yaml = "key: value\n";
390        let cursor = Cursor::new(yaml.as_bytes().to_vec());
391        let reader = BufReader::new(cursor);
392        let mut parser = AsyncStreamingParser::new(reader, Limits::default());
393
394        let result = timeout(Duration::from_secs(5), parser.take(5).collect::<Vec<_>>()).await;
395
396        let events = result.expect("Test timed out after 5 seconds");
397        assert!(!events.is_empty());
398    }
399}
400
401#[cfg(all(test, not(target_arch = "wasm32")))]
402mod mmap_tests {
403    use super::mmap::*;
404    use std::io::Write;
405    use tempfile::NamedTempFile;
406
407    #[test]
408    fn test_mmap_reader() {
409        // Create a temporary file
410        let mut file = NamedTempFile::new().unwrap();
411        writeln!(file, "key: value").unwrap();
412        writeln!(file, "list:").unwrap();
413        writeln!(file, "  - item1").unwrap();
414        writeln!(file, "  - item2").unwrap();
415        file.flush().unwrap();
416
417        // Test memory-mapped reading
418        let mut reader = MmapYamlReader::new(file.path()).unwrap();
419        let content = reader.as_str().unwrap();
420        assert!(content.contains("key: value"));
421
422        // Test chunk reading
423        reader.reset();
424        let chunk = reader.read_chunk(10).unwrap();
425        assert_eq!(chunk, "key: value");
426    }
427}