1#[cfg(feature = "async")]
8use futures::stream::Stream;
9#[cfg(feature = "async")]
10use std::pin::Pin;
11#[cfg(feature = "async")]
12use std::task::{Context, Poll};
13#[cfg(feature = "async")]
14use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
15
16use crate::{
17 Limits, Position, Result,
18 parser::{Event, EventType},
19};
20use std::collections::VecDeque;
21
22#[cfg(feature = "async")]
24pub struct AsyncStreamingParser<R: AsyncBufRead + Unpin> {
25 reader: R,
27 buffer: String,
29 events: VecDeque<Event>,
31 position: Position,
33 state: AsyncParseState,
35 limits: Limits,
37 stats: AsyncStreamStats,
39}
40
41#[cfg(feature = "async")]
42#[derive(Debug, Clone, PartialEq)]
43enum AsyncParseState {
44 Initial,
45 InDocument,
46 BetweenDocuments,
47 Complete,
48}
49
50#[cfg(feature = "async")]
51#[derive(Debug, Clone, Default)]
52#[allow(missing_docs)]
54pub struct AsyncStreamStats {
55 pub bytes_read: usize,
56 pub events_generated: usize,
57 pub documents_parsed: usize,
58}
59
60#[cfg(feature = "async")]
61impl<R: AsyncBufRead + Unpin> AsyncStreamingParser<R> {
62 pub fn new(reader: R, limits: Limits) -> Self {
64 Self {
65 reader,
66 buffer: String::with_capacity(4096),
67 events: VecDeque::with_capacity(100),
68 position: Position::new(),
69 state: AsyncParseState::Initial,
70 limits,
71 stats: AsyncStreamStats::default(),
72 }
73 }
74
75 pub async fn parse_next(&mut self) -> Result<bool> {
77 let mut line = String::new();
79 let bytes_read = self.reader.read_line(&mut line).await?;
80
81 if bytes_read == 0 && self.buffer.is_empty() {
82 self.state = AsyncParseState::Complete;
83 return Ok(false);
84 }
85
86 self.buffer.push_str(&line);
87 self.stats.bytes_read += bytes_read;
88
89 self.parse_buffer()?;
91
92 Ok(!self.events.is_empty())
93 }
94
95 fn parse_buffer(&mut self) -> Result<()> {
97 match self.state {
98 AsyncParseState::Initial => {
99 self.emit_event(EventType::StreamStart)?;
100 self.state = AsyncParseState::BetweenDocuments;
101 }
102 AsyncParseState::BetweenDocuments => {
103 if self.buffer.contains("---") {
104 self.emit_event(EventType::DocumentStart {
105 version: None,
106 tags: Vec::new(),
107 implicit: true,
108 })?;
109 self.state = AsyncParseState::InDocument;
110 self.stats.documents_parsed += 1;
111 }
112 }
113 AsyncParseState::InDocument => {
114 self.parse_document_content()?;
115 }
116 AsyncParseState::Complete => {}
117 }
118 Ok(())
119 }
120
121 fn parse_document_content(&mut self) -> Result<()> {
123 while !self.buffer.is_empty() {
125 if self.buffer.starts_with("...") {
126 self.emit_event(EventType::DocumentEnd { implicit: false })?;
127 self.state = AsyncParseState::BetweenDocuments;
128 self.buffer.drain(..3);
129 break;
130 }
131
132 if let Some(newline_pos) = self.buffer.find('\n') {
134 let line = self.buffer.drain(..=newline_pos).collect::<String>();
135 self.parse_line(line)?;
136 } else {
137 break; }
139 }
140 Ok(())
141 }
142
143 fn parse_line(&mut self, line: String) -> Result<()> {
145 let trimmed = line.trim();
146
147 if trimmed.is_empty() || trimmed.starts_with('#') {
148 return Ok(());
149 }
150
151 if let Some(colon_pos) = trimmed.find(':') {
153 let key = &trimmed[..colon_pos];
154 let value = &trimmed[colon_pos + 1..];
155
156 self.emit_event(EventType::Scalar {
158 value: key.trim().to_string(),
159 anchor: None,
160 tag: None,
161 style: crate::parser::ScalarStyle::Plain,
162 plain_implicit: true,
163 quoted_implicit: true,
164 })?;
165
166 self.emit_event(EventType::Scalar {
167 value: value.trim().to_string(),
168 anchor: None,
169 tag: None,
170 style: crate::parser::ScalarStyle::Plain,
171 plain_implicit: true,
172 quoted_implicit: true,
173 })?;
174 }
175
176 Ok(())
177 }
178
179 fn emit_event(&mut self, event_type: EventType) -> Result<()> {
181 self.events.push_back(Event {
182 event_type,
183 position: self.position,
184 });
185 self.stats.events_generated += 1;
186 Ok(())
187 }
188
189 pub fn next_event(&mut self) -> Option<Event> {
191 self.events.pop_front()
192 }
193
194 pub fn is_complete(&self) -> bool {
196 self.state == AsyncParseState::Complete && self.events.is_empty()
197 }
198
199 pub fn stats(&self) -> &AsyncStreamStats {
201 &self.stats
202 }
203}
204
205#[cfg(feature = "async")]
206impl<R: AsyncBufRead + Unpin> Stream for AsyncStreamingParser<R> {
207 type Item = Result<Event>;
208
209 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210 if let Some(event) = self.next_event() {
212 return Poll::Ready(Some(Ok(event)));
213 }
214
215 if self.is_complete() {
217 return Poll::Ready(None);
218 }
219
220 let waker = cx.waker().clone();
222
223 match futures::executor::block_on(self.parse_next()) {
225 Ok(true) => {
226 if let Some(event) = self.next_event() {
227 Poll::Ready(Some(Ok(event)))
228 } else {
229 waker.wake();
230 Poll::Pending
231 }
232 }
233 Ok(false) => Poll::Ready(None),
234 Err(e) => Poll::Ready(Some(Err(e))),
235 }
236 }
237}
238
239#[cfg(feature = "async")]
241pub mod helpers {
242 use super::*;
243 use std::path::Path;
244 use tokio::fs::File;
245
246 pub async fn stream_from_file_async<P: AsRef<Path>>(
248 path: P,
249 limits: Limits,
250 ) -> Result<AsyncStreamingParser<BufReader<File>>> {
251 let file = File::open(path).await?;
252 let reader = BufReader::new(file);
253 Ok(AsyncStreamingParser::new(reader, limits))
254 }
255
256 pub fn stream_from_async_reader<R: AsyncBufRead + Unpin>(
258 reader: R,
259 limits: Limits,
260 ) -> AsyncStreamingParser<R> {
261 AsyncStreamingParser::new(reader, limits)
262 }
263
264 pub async fn process_yaml_stream<R, F, Fut>(
266 mut parser: AsyncStreamingParser<R>,
267 mut callback: F,
268 ) -> Result<()>
269 where
270 R: AsyncBufRead + Unpin,
271 F: FnMut(Event) -> Fut,
272 Fut: std::future::Future<Output = Result<()>>,
273 {
274 while !parser.is_complete() {
275 if parser.parse_next().await? {
276 while let Some(event) = parser.next_event() {
277 callback(event).await?;
278 }
279 }
280 }
281 Ok(())
282 }
283}
284
285#[cfg(not(target_arch = "wasm32"))]
287pub mod mmap {
288 use crate::Result;
289 use memmap2::{Mmap, MmapOptions};
290 use std::fs::File;
291 use std::path::Path;
292
293 pub struct MmapYamlReader {
295 mmap: Mmap,
296 position: usize,
297 }
298
299 impl MmapYamlReader {
300 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
314 let file = File::open(path)?;
315 #[allow(unsafe_code)]
320 let mmap = unsafe { MmapOptions::new().map(&file)? };
321
322 Ok(Self { mmap, position: 0 })
323 }
324
325 pub fn as_str(&self) -> Result<&str> {
327 std::str::from_utf8(&self.mmap).map_err(|e| {
328 crate::Error::construction(
329 crate::Position::new(),
330 format!("UTF-8 conversion failed: {}", e),
331 )
332 })
333 }
334
335 pub fn try_read_chunk(&mut self, size: usize) -> Result<Option<&str>> {
347 if self.position >= self.mmap.len() {
348 return Ok(None);
349 }
350
351 let mut end = (self.position + size).min(self.mmap.len());
352 while end < self.mmap.len() && (self.mmap[end] & 0xC0) == 0x80 {
357 end += 1;
358 }
359
360 let chunk = &self.mmap[self.position..end];
361 let text = std::str::from_utf8(chunk).map_err(|e| {
362 crate::Error::construction(
363 crate::Position::new(),
364 format!("UTF-8 conversion failed: {}", e),
365 )
366 })?;
367 self.position = end;
368 Ok(Some(text))
369 }
370
371 pub fn read_chunk(&mut self, size: usize) -> Option<&str> {
378 self.try_read_chunk(size).ok().flatten()
379 }
380
381 pub fn reset(&mut self) {
383 self.position = 0;
384 }
385
386 pub fn remaining(&self) -> usize {
388 self.mmap.len().saturating_sub(self.position)
389 }
390 }
391}
392
393#[cfg(all(test, feature = "async"))]
394mod async_tests {
395 use super::*;
396 use futures::StreamExt;
397 use std::io::Cursor;
398
399 #[tokio::test]
400 async fn test_async_streaming() {
401 const MAX_ITERATIONS: usize = 100;
402
403 let yaml = "---\nkey: value\n...\n";
404 let cursor = Cursor::new(yaml.as_bytes().to_vec());
405 let reader = BufReader::new(cursor);
406 let mut parser = AsyncStreamingParser::new(reader, Limits::default());
407
408 let mut events = Vec::new();
409 let mut iterations = 0;
410
411 while !parser.is_complete() && iterations < MAX_ITERATIONS {
412 iterations += 1;
413 match parser.parse_next().await {
414 Ok(has_events) => {
415 if has_events {
416 while let Some(event) = parser.next_event() {
417 events.push(event);
418 }
419 } else if parser.state == AsyncParseState::Complete {
420 break;
422 }
423 }
424 Err(_) => break,
425 }
426 }
427
428 assert!(!events.is_empty());
429 assert!(matches!(events[0].event_type, EventType::StreamStart));
430 }
431
432 #[tokio::test]
433 async fn test_stream_trait() {
434 use tokio::time::{Duration, timeout};
435
436 let yaml = "key: value\n";
437 let cursor = Cursor::new(yaml.as_bytes().to_vec());
438 let reader = BufReader::new(cursor);
439 let mut parser = AsyncStreamingParser::new(reader, Limits::default());
440
441 let result = timeout(Duration::from_secs(5), parser.take(5).collect::<Vec<_>>()).await;
442
443 let events = result.expect("Test timed out after 5 seconds");
444 assert!(!events.is_empty());
445 }
446}
447
448#[cfg(all(test, not(target_arch = "wasm32")))]
449mod mmap_tests {
450 use super::mmap::*;
451 use std::io::Write;
452 use tempfile::NamedTempFile;
453
454 #[test]
455 fn test_mmap_reader() {
456 let mut file = NamedTempFile::new().unwrap();
458 writeln!(file, "key: value").unwrap();
459 writeln!(file, "list:").unwrap();
460 writeln!(file, " - item1").unwrap();
461 writeln!(file, " - item2").unwrap();
462 file.flush().unwrap();
463
464 let mut reader = MmapYamlReader::new(file.path()).unwrap();
466 let content = reader.as_str().unwrap();
467 assert!(content.contains("key: value"));
468
469 reader.reset();
471 let chunk = reader.read_chunk(10).unwrap();
472 assert_eq!(chunk, "key: value");
473 }
474
475 #[test]
479 fn read_chunk_does_not_split_multibyte_utf8() {
480 let mut file = NamedTempFile::new().unwrap();
481 file.write_all("ab€cd".as_bytes()).unwrap();
482 file.flush().unwrap();
483
484 let mut reader = MmapYamlReader::new(file.path()).unwrap();
485 let chunk = reader
486 .read_chunk(3)
487 .expect("a boundary inside a multi-byte char must not yield None");
488 assert_eq!(chunk, "ab€");
489 }
490
491 #[test]
494 fn try_read_chunk_propagates_invalid_utf8() {
495 let mut file = NamedTempFile::new().unwrap();
496 file.write_all(&[0xFF, 0xFE, 0xFD]).unwrap();
497 file.flush().unwrap();
498
499 let mut reader = MmapYamlReader::new(file.path()).unwrap();
500 assert!(
501 reader.try_read_chunk(8).is_err(),
502 "invalid UTF-8 must be reported as an error, not as EOF"
503 );
504 }
505
506 #[test]
509 fn try_read_chunk_signals_eof_with_ok_none() {
510 let mut file = NamedTempFile::new().unwrap();
511 file.write_all(b"hello").unwrap();
512 file.flush().unwrap();
513
514 let mut reader = MmapYamlReader::new(file.path()).unwrap();
515 assert_eq!(reader.try_read_chunk(8).unwrap(), Some("hello"));
516 assert_eq!(reader.try_read_chunk(8).unwrap(), None);
517 }
518}