Skip to main content

rust_yaml/
streaming_async.rs

1//! Async streaming YAML parser for non-blocking I/O
2//!
3//! This module provides async/await support for streaming YAML parsing,
4//! enabling efficient processing of YAML from async sources like network
5//! streams, async file I/O, and more.
6
7#[cfg(feature = "async")]
8use futures::stream::Stream;
9#[cfg(feature = "async")]
10use std::pin::Pin;
11#[cfg(feature = "async")]
12use std::task::{Context, Poll};
13#[cfg(feature = "async")]
14use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
15
16use crate::{
17    Limits, Position, Result,
18    parser::{Event, EventType},
19};
20use std::collections::VecDeque;
21
22/// Async streaming YAML parser
23#[cfg(feature = "async")]
24pub struct AsyncStreamingParser<R: AsyncBufRead + Unpin> {
25    /// Async reader
26    reader: R,
27    /// Buffer for incomplete data
28    buffer: String,
29    /// Event queue
30    events: VecDeque<Event>,
31    /// Current position
32    position: Position,
33    /// Parse state
34    state: AsyncParseState,
35    /// Resource limits
36    limits: Limits,
37    /// Statistics
38    stats: AsyncStreamStats,
39}
40
41#[cfg(feature = "async")]
42#[derive(Debug, Clone, PartialEq)]
43enum AsyncParseState {
44    Initial,
45    InDocument,
46    BetweenDocuments,
47    Complete,
48}
49
50#[cfg(feature = "async")]
51#[derive(Debug, Clone, Default)]
52/// Statistics for async streaming parser
53#[allow(missing_docs)]
54pub struct AsyncStreamStats {
55    pub bytes_read: usize,
56    pub events_generated: usize,
57    pub documents_parsed: usize,
58}
59
60#[cfg(feature = "async")]
61impl<R: AsyncBufRead + Unpin> AsyncStreamingParser<R> {
62    /// Create a new async streaming parser
63    pub fn new(reader: R, limits: Limits) -> Self {
64        Self {
65            reader,
66            buffer: String::with_capacity(4096),
67            events: VecDeque::with_capacity(100),
68            position: Position::new(),
69            state: AsyncParseState::Initial,
70            limits,
71            stats: AsyncStreamStats::default(),
72        }
73    }
74
75    /// Parse the next chunk asynchronously
76    pub async fn parse_next(&mut self) -> Result<bool> {
77        // Read next line or chunk
78        let mut line = String::new();
79        let bytes_read = self.reader.read_line(&mut line).await?;
80
81        if bytes_read == 0 && self.buffer.is_empty() {
82            self.state = AsyncParseState::Complete;
83            return Ok(false);
84        }
85
86        self.buffer.push_str(&line);
87        self.stats.bytes_read += bytes_read;
88
89        // Parse the buffer
90        self.parse_buffer()?;
91
92        Ok(!self.events.is_empty())
93    }
94
95    /// Parse current buffer content
96    fn parse_buffer(&mut self) -> Result<()> {
97        match self.state {
98            AsyncParseState::Initial => {
99                self.emit_event(EventType::StreamStart)?;
100                self.state = AsyncParseState::BetweenDocuments;
101            }
102            AsyncParseState::BetweenDocuments => {
103                if self.buffer.contains("---") {
104                    self.emit_event(EventType::DocumentStart {
105                        version: None,
106                        tags: Vec::new(),
107                        implicit: true,
108                    })?;
109                    self.state = AsyncParseState::InDocument;
110                    self.stats.documents_parsed += 1;
111                }
112            }
113            AsyncParseState::InDocument => {
114                self.parse_document_content()?;
115            }
116            AsyncParseState::Complete => {}
117        }
118        Ok(())
119    }
120
121    /// Parse document content
122    fn parse_document_content(&mut self) -> Result<()> {
123        // Simplified parsing logic
124        while !self.buffer.is_empty() {
125            if self.buffer.starts_with("...") {
126                self.emit_event(EventType::DocumentEnd { implicit: false })?;
127                self.state = AsyncParseState::BetweenDocuments;
128                self.buffer.drain(..3);
129                break;
130            }
131
132            // Parse line by line (simplified)
133            if let Some(newline_pos) = self.buffer.find('\n') {
134                let line = self.buffer.drain(..=newline_pos).collect::<String>();
135                self.parse_line(line)?;
136            } else {
137                break; // Need more data
138            }
139        }
140        Ok(())
141    }
142
143    /// Parse a single line
144    fn parse_line(&mut self, line: String) -> Result<()> {
145        let trimmed = line.trim();
146
147        if trimmed.is_empty() || trimmed.starts_with('#') {
148            return Ok(());
149        }
150
151        // Simple key-value parsing
152        if let Some(colon_pos) = trimmed.find(':') {
153            let key = &trimmed[..colon_pos];
154            let value = &trimmed[colon_pos + 1..];
155
156            // Emit scalar events for key and value
157            self.emit_event(EventType::Scalar {
158                value: key.trim().to_string(),
159                anchor: None,
160                tag: None,
161                style: crate::parser::ScalarStyle::Plain,
162                plain_implicit: true,
163                quoted_implicit: true,
164            })?;
165
166            self.emit_event(EventType::Scalar {
167                value: value.trim().to_string(),
168                anchor: None,
169                tag: None,
170                style: crate::parser::ScalarStyle::Plain,
171                plain_implicit: true,
172                quoted_implicit: true,
173            })?;
174        }
175
176        Ok(())
177    }
178
179    /// Emit an event
180    fn emit_event(&mut self, event_type: EventType) -> Result<()> {
181        self.events.push_back(Event {
182            event_type,
183            position: self.position,
184        });
185        self.stats.events_generated += 1;
186        Ok(())
187    }
188
189    /// Get the next event
190    pub fn next_event(&mut self) -> Option<Event> {
191        self.events.pop_front()
192    }
193
194    /// Check if parsing is complete
195    pub fn is_complete(&self) -> bool {
196        self.state == AsyncParseState::Complete && self.events.is_empty()
197    }
198
199    /// Get statistics
200    pub fn stats(&self) -> &AsyncStreamStats {
201        &self.stats
202    }
203}
204
205#[cfg(feature = "async")]
206impl<R: AsyncBufRead + Unpin> Stream for AsyncStreamingParser<R> {
207    type Item = Result<Event>;
208
209    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210        // Check if we have buffered events
211        if let Some(event) = self.next_event() {
212            return Poll::Ready(Some(Ok(event)));
213        }
214
215        // If parsing is complete, return None
216        if self.is_complete() {
217            return Poll::Ready(None);
218        }
219
220        // Try to parse more data
221        let waker = cx.waker().clone();
222
223        // This is simplified - in production would use proper async runtime integration
224        match futures::executor::block_on(self.parse_next()) {
225            Ok(true) => {
226                if let Some(event) = self.next_event() {
227                    Poll::Ready(Some(Ok(event)))
228                } else {
229                    waker.wake();
230                    Poll::Pending
231                }
232            }
233            Ok(false) => Poll::Ready(None),
234            Err(e) => Poll::Ready(Some(Err(e))),
235        }
236    }
237}
238
239/// Async helper functions
240#[cfg(feature = "async")]
241pub mod helpers {
242    use super::*;
243    use std::path::Path;
244    use tokio::fs::File;
245
246    /// Stream YAML from an async file
247    pub async fn stream_from_file_async<P: AsRef<Path>>(
248        path: P,
249        limits: Limits,
250    ) -> Result<AsyncStreamingParser<BufReader<File>>> {
251        let file = File::open(path).await?;
252        let reader = BufReader::new(file);
253        Ok(AsyncStreamingParser::new(reader, limits))
254    }
255
256    /// Stream YAML from async reader
257    pub fn stream_from_async_reader<R: AsyncBufRead + Unpin>(
258        reader: R,
259        limits: Limits,
260    ) -> AsyncStreamingParser<R> {
261        AsyncStreamingParser::new(reader, limits)
262    }
263
264    /// Process YAML stream with a callback
265    pub async fn process_yaml_stream<R, F, Fut>(
266        mut parser: AsyncStreamingParser<R>,
267        mut callback: F,
268    ) -> Result<()>
269    where
270        R: AsyncBufRead + Unpin,
271        F: FnMut(Event) -> Fut,
272        Fut: std::future::Future<Output = Result<()>>,
273    {
274        while !parser.is_complete() {
275            if parser.parse_next().await? {
276                while let Some(event) = parser.next_event() {
277                    callback(event).await?;
278                }
279            }
280        }
281        Ok(())
282    }
283}
284
285/// Memory-mapped file support for efficient large file processing
286#[cfg(not(target_arch = "wasm32"))]
287pub mod mmap {
288    use crate::Result;
289    use memmap2::{Mmap, MmapOptions};
290    use std::fs::File;
291    use std::path::Path;
292
293    /// Memory-mapped YAML file reader
294    pub struct MmapYamlReader {
295        mmap: Mmap,
296        position: usize,
297    }
298
299    impl MmapYamlReader {
300        /// Create a new memory-mapped reader for the file at `path`.
301        ///
302        /// # Warning
303        ///
304        /// Memory mapping ties the process to the file's backing storage for
305        /// the lifetime of the reader. If the file is **truncated or modified
306        /// by another process** while mapped, touching the now-invalid pages
307        /// raises `SIGBUS` on Linux/macOS — an unrecoverable signal that
308        /// terminates the process.
309        ///
310        /// Only use `MmapYamlReader` with **trusted, stable files** that no
311        /// other process will modify concurrently (a TOCTOU hazard). For
312        /// untrusted or volatile inputs, read the file into memory instead.
313        pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
314            let file = File::open(path)?;
315            // SAFETY: `MmapOptions::map` is unsafe because the mapped region
316            // is invalidated if the underlying file changes while mapped (see
317            // the SIGBUS note above). The caller contract requires a stable,
318            // trusted file; given that, this read-only mapping is sound.
319            #[allow(unsafe_code)]
320            let mmap = unsafe { MmapOptions::new().map(&file)? };
321
322            Ok(Self { mmap, position: 0 })
323        }
324
325        /// Get the entire content as a string slice
326        pub fn as_str(&self) -> Result<&str> {
327            std::str::from_utf8(&self.mmap).map_err(|e| {
328                crate::Error::construction(
329                    crate::Position::new(),
330                    format!("UTF-8 conversion failed: {}", e),
331                )
332            })
333        }
334
335        /// Read a chunk from the current position, propagating UTF-8 errors.
336        ///
337        /// Returns `Ok(None)` at end of input, `Ok(Some(chunk))` for a valid
338        /// chunk of text, and `Err(..)` when the file holds invalid UTF-8.
339        /// This is the error-propagating counterpart of `read_chunk`, which
340        /// reports a malformed file as `None`, indistinguishable from EOF.
341        ///
342        /// The returned chunk always ends on a UTF-8 character boundary: a
343        /// multi-byte sequence straddling `size` is never split, so a valid
344        /// file is read to completion rather than silently truncated. The
345        /// chunk may therefore be a few bytes longer than `size` (#25).
346        pub fn try_read_chunk(&mut self, size: usize) -> Result<Option<&str>> {
347            if self.position >= self.mmap.len() {
348                return Ok(None);
349            }
350
351            let mut end = (self.position + size).min(self.mmap.len());
352            // Walk `end` past any UTF-8 continuation bytes (0b10xxxxxx) so the
353            // chunk stops on a character boundary. This keeps the chunk valid
354            // UTF-8 and guarantees forward progress even when `size` is
355            // smaller than the next character.
356            while end < self.mmap.len() && (self.mmap[end] & 0xC0) == 0x80 {
357                end += 1;
358            }
359
360            let chunk = &self.mmap[self.position..end];
361            let text = std::str::from_utf8(chunk).map_err(|e| {
362                crate::Error::construction(
363                    crate::Position::new(),
364                    format!("UTF-8 conversion failed: {}", e),
365                )
366            })?;
367            self.position = end;
368            Ok(Some(text))
369        }
370
371        /// Read a chunk from the current position.
372        ///
373        /// Returns `None` at end of input. Like `try_read_chunk`, the chunk
374        /// never splits a multi-byte UTF-8 sequence. A `None` cannot be told
375        /// apart from a file containing invalid UTF-8 — prefer
376        /// `try_read_chunk` when that distinction matters (#25).
377        pub fn read_chunk(&mut self, size: usize) -> Option<&str> {
378            self.try_read_chunk(size).ok().flatten()
379        }
380
381        /// Reset position to beginning
382        pub fn reset(&mut self) {
383            self.position = 0;
384        }
385
386        /// Get remaining bytes
387        pub fn remaining(&self) -> usize {
388            self.mmap.len().saturating_sub(self.position)
389        }
390    }
391}
392
393#[cfg(all(test, feature = "async"))]
394mod async_tests {
395    use super::*;
396    use futures::StreamExt;
397    use std::io::Cursor;
398
399    #[tokio::test]
400    async fn test_async_streaming() {
401        const MAX_ITERATIONS: usize = 100;
402
403        let yaml = "---\nkey: value\n...\n";
404        let cursor = Cursor::new(yaml.as_bytes().to_vec());
405        let reader = BufReader::new(cursor);
406        let mut parser = AsyncStreamingParser::new(reader, Limits::default());
407
408        let mut events = Vec::new();
409        let mut iterations = 0;
410
411        while !parser.is_complete() && iterations < MAX_ITERATIONS {
412            iterations += 1;
413            match parser.parse_next().await {
414                Ok(has_events) => {
415                    if has_events {
416                        while let Some(event) = parser.next_event() {
417                            events.push(event);
418                        }
419                    } else if parser.state == AsyncParseState::Complete {
420                        // Ensure we exit when parsing is done
421                        break;
422                    }
423                }
424                Err(_) => break,
425            }
426        }
427
428        assert!(!events.is_empty());
429        assert!(matches!(events[0].event_type, EventType::StreamStart));
430    }
431
432    #[tokio::test]
433    async fn test_stream_trait() {
434        use tokio::time::{Duration, timeout};
435
436        let yaml = "key: value\n";
437        let cursor = Cursor::new(yaml.as_bytes().to_vec());
438        let reader = BufReader::new(cursor);
439        let mut parser = AsyncStreamingParser::new(reader, Limits::default());
440
441        let result = timeout(Duration::from_secs(5), parser.take(5).collect::<Vec<_>>()).await;
442
443        let events = result.expect("Test timed out after 5 seconds");
444        assert!(!events.is_empty());
445    }
446}
447
448#[cfg(all(test, not(target_arch = "wasm32")))]
449mod mmap_tests {
450    use super::mmap::*;
451    use std::io::Write;
452    use tempfile::NamedTempFile;
453
454    #[test]
455    fn test_mmap_reader() {
456        // Create a temporary file
457        let mut file = NamedTempFile::new().unwrap();
458        writeln!(file, "key: value").unwrap();
459        writeln!(file, "list:").unwrap();
460        writeln!(file, "  - item1").unwrap();
461        writeln!(file, "  - item2").unwrap();
462        file.flush().unwrap();
463
464        // Test memory-mapped reading
465        let mut reader = MmapYamlReader::new(file.path()).unwrap();
466        let content = reader.as_str().unwrap();
467        assert!(content.contains("key: value"));
468
469        // Test chunk reading
470        reader.reset();
471        let chunk = reader.read_chunk(10).unwrap();
472        assert_eq!(chunk, "key: value");
473    }
474
475    /// Regression for #25. "€" is 3 bytes (E2 82 AC). A chunk size landing
476    /// inside that sequence must not silently truncate a valid file — the
477    /// chunk extends to the next UTF-8 character boundary instead.
478    #[test]
479    fn read_chunk_does_not_split_multibyte_utf8() {
480        let mut file = NamedTempFile::new().unwrap();
481        file.write_all("ab€cd".as_bytes()).unwrap();
482        file.flush().unwrap();
483
484        let mut reader = MmapYamlReader::new(file.path()).unwrap();
485        let chunk = reader
486            .read_chunk(3)
487            .expect("a boundary inside a multi-byte char must not yield None");
488        assert_eq!(chunk, "ab€");
489    }
490
491    /// Regression for #25. A genuinely malformed file must surface as `Err`,
492    /// distinct from the `Ok(None)` that signals end of input.
493    #[test]
494    fn try_read_chunk_propagates_invalid_utf8() {
495        let mut file = NamedTempFile::new().unwrap();
496        file.write_all(&[0xFF, 0xFE, 0xFD]).unwrap();
497        file.flush().unwrap();
498
499        let mut reader = MmapYamlReader::new(file.path()).unwrap();
500        assert!(
501            reader.try_read_chunk(8).is_err(),
502            "invalid UTF-8 must be reported as an error, not as EOF"
503        );
504    }
505
506    /// Regression for #25. End of input is `Ok(None)` — never confused with
507    /// the `Err` returned for malformed bytes.
508    #[test]
509    fn try_read_chunk_signals_eof_with_ok_none() {
510        let mut file = NamedTempFile::new().unwrap();
511        file.write_all(b"hello").unwrap();
512        file.flush().unwrap();
513
514        let mut reader = MmapYamlReader::new(file.path()).unwrap();
515        assert_eq!(reader.try_read_chunk(8).unwrap(), Some("hello"));
516        assert_eq!(reader.try_read_chunk(8).unwrap(), None);
517    }
518}