1#[cfg(feature = "async")]
8use futures::stream::Stream;
9#[cfg(feature = "async")]
10use std::pin::Pin;
11#[cfg(feature = "async")]
12use std::task::{Context, Poll};
13#[cfg(feature = "async")]
14use tokio::io::{AsyncBufRead, AsyncBufReadExt, BufReader};
15
16use crate::{
17 parser::{Event, EventType},
18 Limits, Position, Result,
19};
20use std::collections::VecDeque;
21
22#[cfg(feature = "async")]
24pub struct AsyncStreamingParser<R: AsyncBufRead + Unpin> {
25 reader: R,
27 buffer: String,
29 events: VecDeque<Event>,
31 position: Position,
33 state: AsyncParseState,
35 limits: Limits,
37 stats: AsyncStreamStats,
39}
40
41#[cfg(feature = "async")]
42#[derive(Debug, Clone, PartialEq)]
43enum AsyncParseState {
44 Initial,
45 InDocument,
46 BetweenDocuments,
47 Complete,
48}
49
50#[cfg(feature = "async")]
51#[derive(Debug, Clone, Default)]
52#[allow(missing_docs)]
54pub struct AsyncStreamStats {
55 pub bytes_read: usize,
56 pub events_generated: usize,
57 pub documents_parsed: usize,
58}
59
60#[cfg(feature = "async")]
61impl<R: AsyncBufRead + Unpin> AsyncStreamingParser<R> {
62 pub fn new(reader: R, limits: Limits) -> Self {
64 Self {
65 reader,
66 buffer: String::with_capacity(4096),
67 events: VecDeque::with_capacity(100),
68 position: Position::new(),
69 state: AsyncParseState::Initial,
70 limits,
71 stats: AsyncStreamStats::default(),
72 }
73 }
74
75 pub async fn parse_next(&mut self) -> Result<bool> {
77 let mut line = String::new();
79 let bytes_read = self.reader.read_line(&mut line).await?;
80
81 if bytes_read == 0 && self.buffer.is_empty() {
82 self.state = AsyncParseState::Complete;
83 return Ok(false);
84 }
85
86 self.buffer.push_str(&line);
87 self.stats.bytes_read += bytes_read;
88
89 self.parse_buffer()?;
91
92 Ok(!self.events.is_empty())
93 }
94
95 fn parse_buffer(&mut self) -> Result<()> {
97 match self.state {
98 AsyncParseState::Initial => {
99 self.emit_event(EventType::StreamStart)?;
100 self.state = AsyncParseState::BetweenDocuments;
101 }
102 AsyncParseState::BetweenDocuments => {
103 if self.buffer.contains("---") {
104 self.emit_event(EventType::DocumentStart {
105 version: None,
106 tags: Vec::new(),
107 implicit: true,
108 })?;
109 self.state = AsyncParseState::InDocument;
110 self.stats.documents_parsed += 1;
111 }
112 }
113 AsyncParseState::InDocument => {
114 self.parse_document_content()?;
115 }
116 AsyncParseState::Complete => {}
117 }
118 Ok(())
119 }
120
121 fn parse_document_content(&mut self) -> Result<()> {
123 while !self.buffer.is_empty() {
125 if self.buffer.starts_with("...") {
126 self.emit_event(EventType::DocumentEnd { implicit: false })?;
127 self.state = AsyncParseState::BetweenDocuments;
128 self.buffer.drain(..3);
129 break;
130 }
131
132 if let Some(newline_pos) = self.buffer.find('\n') {
134 let line = self.buffer.drain(..=newline_pos).collect::<String>();
135 self.parse_line(line)?;
136 } else {
137 break; }
139 }
140 Ok(())
141 }
142
143 fn parse_line(&mut self, line: String) -> Result<()> {
145 let trimmed = line.trim();
146
147 if trimmed.is_empty() || trimmed.starts_with('#') {
148 return Ok(());
149 }
150
151 if let Some(colon_pos) = trimmed.find(':') {
153 let key = &trimmed[..colon_pos];
154 let value = &trimmed[colon_pos + 1..];
155
156 self.emit_event(EventType::Scalar {
158 value: key.trim().to_string(),
159 anchor: None,
160 tag: None,
161 style: crate::parser::ScalarStyle::Plain,
162 plain_implicit: true,
163 quoted_implicit: true,
164 })?;
165
166 self.emit_event(EventType::Scalar {
167 value: value.trim().to_string(),
168 anchor: None,
169 tag: None,
170 style: crate::parser::ScalarStyle::Plain,
171 plain_implicit: true,
172 quoted_implicit: true,
173 })?;
174 }
175
176 Ok(())
177 }
178
179 fn emit_event(&mut self, event_type: EventType) -> Result<()> {
181 self.events.push_back(Event {
182 event_type,
183 position: self.position,
184 });
185 self.stats.events_generated += 1;
186 Ok(())
187 }
188
189 pub fn next_event(&mut self) -> Option<Event> {
191 self.events.pop_front()
192 }
193
194 pub fn is_complete(&self) -> bool {
196 self.state == AsyncParseState::Complete && self.events.is_empty()
197 }
198
199 pub fn stats(&self) -> &AsyncStreamStats {
201 &self.stats
202 }
203}
204
205#[cfg(feature = "async")]
206impl<R: AsyncBufRead + Unpin> Stream for AsyncStreamingParser<R> {
207 type Item = Result<Event>;
208
209 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
210 if let Some(event) = self.next_event() {
212 return Poll::Ready(Some(Ok(event)));
213 }
214
215 if self.is_complete() {
217 return Poll::Ready(None);
218 }
219
220 let waker = cx.waker().clone();
222
223 match futures::executor::block_on(self.parse_next()) {
225 Ok(true) => {
226 if let Some(event) = self.next_event() {
227 Poll::Ready(Some(Ok(event)))
228 } else {
229 waker.wake();
230 Poll::Pending
231 }
232 }
233 Ok(false) => Poll::Ready(None),
234 Err(e) => Poll::Ready(Some(Err(e))),
235 }
236 }
237}
238
239#[cfg(feature = "async")]
241pub mod helpers {
242 use super::*;
243 use std::path::Path;
244 use tokio::fs::File;
245
246 pub async fn stream_from_file_async<P: AsRef<Path>>(
248 path: P,
249 limits: Limits,
250 ) -> Result<AsyncStreamingParser<BufReader<File>>> {
251 let file = File::open(path).await?;
252 let reader = BufReader::new(file);
253 Ok(AsyncStreamingParser::new(reader, limits))
254 }
255
256 pub fn stream_from_async_reader<R: AsyncBufRead + Unpin>(
258 reader: R,
259 limits: Limits,
260 ) -> AsyncStreamingParser<R> {
261 AsyncStreamingParser::new(reader, limits)
262 }
263
264 pub async fn process_yaml_stream<R, F, Fut>(
266 mut parser: AsyncStreamingParser<R>,
267 mut callback: F,
268 ) -> Result<()>
269 where
270 R: AsyncBufRead + Unpin,
271 F: FnMut(Event) -> Fut,
272 Fut: std::future::Future<Output = Result<()>>,
273 {
274 while !parser.is_complete() {
275 if parser.parse_next().await? {
276 while let Some(event) = parser.next_event() {
277 callback(event).await?;
278 }
279 }
280 }
281 Ok(())
282 }
283}
284
285#[cfg(not(target_arch = "wasm32"))]
287pub mod mmap {
288 use crate::Result;
289 use memmap2::{Mmap, MmapOptions};
290 use std::fs::File;
291 use std::path::Path;
292
293 pub struct MmapYamlReader {
295 mmap: Mmap,
296 position: usize,
297 }
298
299 impl MmapYamlReader {
300 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
302 let file = File::open(path)?;
303 #[allow(unsafe_code)]
306 let mmap = unsafe { MmapOptions::new().map(&file)? };
307
308 Ok(Self { mmap, position: 0 })
309 }
310
311 pub fn as_str(&self) -> Result<&str> {
313 std::str::from_utf8(&self.mmap).map_err(|e| {
314 crate::Error::construction(
315 crate::Position::new(),
316 format!("UTF-8 conversion failed: {}", e),
317 )
318 })
319 }
320
321 pub fn read_chunk(&mut self, size: usize) -> Option<&str> {
323 if self.position >= self.mmap.len() {
324 return None;
325 }
326
327 let end = (self.position + size).min(self.mmap.len());
328 let chunk = &self.mmap[self.position..end];
329 self.position = end;
330
331 std::str::from_utf8(chunk).ok()
332 }
333
334 pub fn reset(&mut self) {
336 self.position = 0;
337 }
338
339 pub fn remaining(&self) -> usize {
341 self.mmap.len().saturating_sub(self.position)
342 }
343 }
344}
345
346#[cfg(all(test, feature = "async"))]
347mod async_tests {
348 use super::*;
349 use futures::StreamExt;
350 use std::io::Cursor;
351
352 #[tokio::test]
353 async fn test_async_streaming() {
354 const MAX_ITERATIONS: usize = 100;
355
356 let yaml = "---\nkey: value\n...\n";
357 let cursor = Cursor::new(yaml.as_bytes().to_vec());
358 let reader = BufReader::new(cursor);
359 let mut parser = AsyncStreamingParser::new(reader, Limits::default());
360
361 let mut events = Vec::new();
362 let mut iterations = 0;
363
364 while !parser.is_complete() && iterations < MAX_ITERATIONS {
365 iterations += 1;
366 match parser.parse_next().await {
367 Ok(has_events) => {
368 if has_events {
369 while let Some(event) = parser.next_event() {
370 events.push(event);
371 }
372 } else if parser.state == AsyncParseState::Complete {
373 break;
375 }
376 }
377 Err(_) => break,
378 }
379 }
380
381 assert!(!events.is_empty());
382 assert!(matches!(events[0].event_type, EventType::StreamStart));
383 }
384
385 #[tokio::test]
386 async fn test_stream_trait() {
387 use tokio::time::{timeout, Duration};
388
389 let yaml = "key: value\n";
390 let cursor = Cursor::new(yaml.as_bytes().to_vec());
391 let reader = BufReader::new(cursor);
392 let mut parser = AsyncStreamingParser::new(reader, Limits::default());
393
394 let result = timeout(Duration::from_secs(5), parser.take(5).collect::<Vec<_>>()).await;
395
396 let events = result.expect("Test timed out after 5 seconds");
397 assert!(!events.is_empty());
398 }
399}
400
401#[cfg(all(test, not(target_arch = "wasm32")))]
402mod mmap_tests {
403 use super::mmap::*;
404 use std::io::Write;
405 use tempfile::NamedTempFile;
406
407 #[test]
408 fn test_mmap_reader() {
409 let mut file = NamedTempFile::new().unwrap();
411 writeln!(file, "key: value").unwrap();
412 writeln!(file, "list:").unwrap();
413 writeln!(file, " - item1").unwrap();
414 writeln!(file, " - item2").unwrap();
415 file.flush().unwrap();
416
417 let mut reader = MmapYamlReader::new(file.path()).unwrap();
419 let content = reader.as_str().unwrap();
420 assert!(content.contains("key: value"));
421
422 reader.reset();
424 let chunk = reader.read_chunk(10).unwrap();
425 assert_eq!(chunk, "key: value");
426 }
427}