hedl_stream/async_reader.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async line reader for streaming parser.
19//!
20//! Provides buffered async line-by-line reading with line number tracking, peek support,
21//! and the ability to push back lines for re-parsing.
22//!
23//! This module mirrors the synchronous [`LineReader`](crate::LineReader) but uses
24//! tokio's async I/O primitives for non-blocking operation.
25//!
26//! # Examples
27//!
28//! ## Basic Async Line Reading
29//!
30//! ```rust,no_run
31//! # #[cfg(feature = "async")]
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! use hedl_stream::AsyncLineReader;
34//! use tokio::io::AsyncReadExt;
35//! use std::io::Cursor;
36//!
37//! let input = "line1\nline2\nline3";
38//! let mut reader = AsyncLineReader::new(Cursor::new(input));
39//!
40//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
41//! assert_eq!(reader.next_line().await?, Some((2, "line2".to_string())));
42//! assert_eq!(reader.next_line().await?, Some((3, "line3".to_string())));
43//! assert_eq!(reader.next_line().await?, None);
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ## Peeking and Push Back
49//!
50//! ```rust,no_run
51//! # #[cfg(feature = "async")]
52//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53//! use hedl_stream::AsyncLineReader;
54//! use std::io::Cursor;
55//!
56//! let input = "line1\nline2";
57//! let mut reader = AsyncLineReader::new(Cursor::new(input));
58//!
59//! // Peek without consuming
60//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
61//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
62//!
63//! // Now consume it
64//! let line = reader.next_line().await?.unwrap();
65//! assert_eq!(line, (1, "line1".to_string()));
66//!
67//! // Push it back
68//! reader.push_back(line.0, line.1);
69//!
70//! // Read it again
71//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
72//! # Ok(())
73//! # }
74//! ```
75
76use crate::error::{StreamError, StreamResult};
77use memchr::memchr;
78use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
79
80/// Buffered async line reader with line number tracking.
81///
82/// Reads input line-by-line asynchronously, automatically handling different line endings
83/// (LF, CRLF) and tracking the current line number for error reporting.
84///
85/// # Performance Characteristics
86///
87/// - **Buffering**: Configurable buffer size (default 64KB) for efficient I/O
88/// - **Zero-Copy**: String allocations only for consumed lines
89/// - **Async**: Non-blocking I/O suitable for high-concurrency scenarios
90///
91/// # When to Use Async vs Sync
92///
93/// **Use Async When:**
94/// - Processing network streams or pipes
95/// - High-concurrency scenarios (many parallel streams)
96/// - Integration with async web servers or frameworks
97/// - Need to process I/O without blocking threads
98///
99/// **Use Sync When:**
100/// - Processing local files
101/// - Single-threaded batch processing
102/// - Simpler code without async complexity
103/// - CPU-bound workloads with minimal I/O wait
104///
105/// # Examples
106///
107/// ## Reading from Async Source
108///
109/// ```rust,no_run
110/// # #[cfg(feature = "async")]
111/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
112/// use hedl_stream::AsyncLineReader;
113/// use tokio::fs::File;
114///
115/// let file = File::open("data.hedl").await?;
116/// let mut reader = AsyncLineReader::new(file);
117///
118/// while let Some((line_num, line)) = reader.next_line().await? {
119/// println!("{}: {}", line_num, line);
120/// }
121/// # Ok(())
122/// # }
123/// ```
124///
125/// ## With Custom Buffer Size
126///
127/// ```rust
128/// # #[cfg(feature = "async")]
129/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
130/// use hedl_stream::AsyncLineReader;
131/// use std::io::Cursor;
132///
133/// let input = "line1\nline2";
134/// let reader = AsyncLineReader::with_capacity(Cursor::new(input), 256 * 1024);
135/// # Ok(())
136/// # }
137/// ```
138pub struct AsyncLineReader<R: AsyncRead + Unpin> {
139 reader: BufReader<R>,
140 line_number: usize,
141 buffer: String,
142 peeked: Option<(usize, String)>,
143 max_line_length: usize,
144}
145
146impl<R: AsyncRead + Unpin> AsyncLineReader<R> {
147 /// Create a new async line reader with default buffer size (64KB) and max line length (1MB).
148 ///
149 /// # Examples
150 ///
151 /// ```rust
152 /// # #[cfg(feature = "async")]
153 /// use hedl_stream::AsyncLineReader;
154 /// use std::io::Cursor;
155 ///
156 /// let input = "line1\nline2";
157 /// let reader = AsyncLineReader::new(Cursor::new(input));
158 /// ```
159 pub fn new(reader: R) -> Self {
160 Self {
161 reader: BufReader::new(reader),
162 line_number: 0,
163 buffer: String::new(),
164 peeked: None,
165 max_line_length: 1_000_000,
166 }
167 }
168
169 /// Create an async line reader with a specific buffer capacity and default max line length (1MB).
170 ///
171 /// # Parameters
172 ///
173 /// - `reader`: The async readable source
174 /// - `capacity`: Buffer size in bytes
175 ///
176 /// # Examples
177 ///
178 /// ```rust
179 /// # #[cfg(feature = "async")]
180 /// use hedl_stream::AsyncLineReader;
181 /// use std::io::Cursor;
182 ///
183 /// // Use a larger buffer for large files
184 /// let reader = AsyncLineReader::with_capacity(
185 /// Cursor::new("data"),
186 /// 256 * 1024 // 256KB
187 /// );
188 /// ```
189 pub fn with_capacity(reader: R, capacity: usize) -> Self {
190 Self {
191 reader: BufReader::with_capacity(capacity, reader),
192 line_number: 0,
193 buffer: String::new(),
194 peeked: None,
195 max_line_length: 1_000_000,
196 }
197 }
198
199 /// Create with a specific max line length.
200 pub fn with_max_length(reader: R, max_line_length: usize) -> Self {
201 Self {
202 reader: BufReader::new(reader),
203 line_number: 0,
204 buffer: String::new(),
205 peeked: None,
206 max_line_length,
207 }
208 }
209
210 /// Create with a specific buffer capacity and max line length.
211 pub fn with_capacity_and_max_length(
212 reader: R,
213 capacity: usize,
214 max_line_length: usize,
215 ) -> Self {
216 Self {
217 reader: BufReader::with_capacity(capacity, reader),
218 line_number: 0,
219 buffer: String::new(),
220 peeked: None,
221 max_line_length,
222 }
223 }
224
225 /// Get the current line number.
226 ///
227 /// Returns 0 before any lines are read, then increments with each line.
228 ///
229 /// # Examples
230 ///
231 /// ```rust
232 /// # #[cfg(feature = "async")]
233 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
234 /// use hedl_stream::AsyncLineReader;
235 /// use std::io::Cursor;
236 ///
237 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
238 ///
239 /// assert_eq!(reader.line_number(), 0);
240 /// reader.next_line().await?;
241 /// assert_eq!(reader.line_number(), 1);
242 /// reader.next_line().await?;
243 /// assert_eq!(reader.line_number(), 2);
244 /// # Ok(())
245 /// # }
246 /// ```
247 #[inline]
248 pub fn line_number(&self) -> usize {
249 self.line_number
250 }
251
252 /// Read the next line asynchronously.
253 ///
254 /// Returns `Ok(Some((line_num, line)))` if a line was read, `Ok(None)` at EOF,
255 /// or `Err` on I/O errors.
256 ///
257 /// Trailing newlines (LF or CRLF) are automatically stripped.
258 ///
259 /// # Performance
260 ///
261 /// This method awaits on I/O and yields to the runtime if data is not available,
262 /// allowing other tasks to run. It does not block the thread.
263 ///
264 /// # Examples
265 ///
266 /// ```rust
267 /// # #[cfg(feature = "async")]
268 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
269 /// use hedl_stream::AsyncLineReader;
270 /// use std::io::Cursor;
271 ///
272 /// let mut reader = AsyncLineReader::new(Cursor::new("hello\nworld"));
273 ///
274 /// let (num, line) = reader.next_line().await?.unwrap();
275 /// assert_eq!(num, 1);
276 /// assert_eq!(line, "hello");
277 ///
278 /// let (num, line) = reader.next_line().await?.unwrap();
279 /// assert_eq!(num, 2);
280 /// assert_eq!(line, "world");
281 ///
282 /// assert_eq!(reader.next_line().await?, None);
283 /// # Ok(())
284 /// # }
285 /// ```
286 pub async fn next_line(&mut self) -> StreamResult<Option<(usize, String)>> {
287 // Return peeked line if available
288 if let Some(peeked) = self.peeked.take() {
289 return Ok(Some(peeked));
290 }
291
292 self.read_line_internal().await
293 }
294
295 /// Peek at the next line without consuming it.
296 ///
297 /// Returns a reference to the next line without advancing the reader.
298 /// Subsequent calls to `peek_line()` return the same line. Call `next_line()`
299 /// to consume it.
300 ///
301 /// # Examples
302 ///
303 /// ```rust
304 /// # #[cfg(feature = "async")]
305 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306 /// use hedl_stream::AsyncLineReader;
307 /// use std::io::Cursor;
308 ///
309 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
310 ///
311 /// // Peek multiple times
312 /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
313 /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
314 ///
315 /// // Consume
316 /// reader.next_line().await?;
317 ///
318 /// // Next peek is the second line
319 /// assert_eq!(reader.peek_line().await?, Some(&(2, "line2".to_string())));
320 /// # Ok(())
321 /// # }
322 /// ```
323 pub async fn peek_line(&mut self) -> StreamResult<Option<&(usize, String)>> {
324 if self.peeked.is_none() {
325 self.peeked = self.read_line_internal().await?;
326 }
327 Ok(self.peeked.as_ref())
328 }
329
330 /// Push a line back to be read again.
331 ///
332 /// The next call to `next_line()` or `peek_line()` will return this line.
333 /// Only one line can be pushed back at a time; subsequent calls overwrite
334 /// the previously pushed line.
335 ///
336 /// # Examples
337 ///
338 /// ```rust
339 /// # #[cfg(feature = "async")]
340 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341 /// use hedl_stream::AsyncLineReader;
342 /// use std::io::Cursor;
343 ///
344 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
345 ///
346 /// let line = reader.next_line().await?.unwrap();
347 /// assert_eq!(line, (1, "line1".to_string()));
348 ///
349 /// // Push it back
350 /// reader.push_back(line.0, line.1);
351 ///
352 /// // Read it again
353 /// let line = reader.next_line().await?.unwrap();
354 /// assert_eq!(line, (1, "line1".to_string()));
355 /// # Ok(())
356 /// # }
357 /// ```
358 #[inline]
359 pub fn push_back(&mut self, line_num: usize, line: String) {
360 self.peeked = Some((line_num, line));
361 }
362
363 async fn read_line_internal(&mut self) -> StreamResult<Option<(usize, String)>> {
364 self.buffer.clear();
365
366 loop {
367 // Read from BufReader's internal buffer (zero-copy)
368 let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
369
370 if available.is_empty() {
371 // EOF
372 if self.buffer.is_empty() {
373 return Ok(None);
374 }
375 // Return partial line (no trailing newline)
376 self.line_number += 1;
377 return Ok(Some((self.line_number, self.buffer.clone())));
378 }
379
380 // Find newline in available data
381 if let Some(newline_pos) = memchr(b'\n', available) {
382 // Check limit BEFORE appending
383 if self.buffer.len() + newline_pos > self.max_line_length {
384 // CRITICAL: Consume the oversized line data to prevent infinite loop
385 // Consume up to and including the newline character
386 self.reader.consume(newline_pos + 1);
387 let total_length = self.buffer.len() + newline_pos;
388 self.line_number += 1;
389 self.buffer.clear();
390 return Err(StreamError::LineTooLong {
391 line: self.line_number,
392 length: total_length,
393 limit: self.max_line_length,
394 });
395 }
396
397 // Append up to newline (excluding the newline itself)
398 let mut line_end = newline_pos;
399
400 // Handle CRLF: if newline is preceded by CR, exclude it too
401 if newline_pos > 0 && available[newline_pos - 1] == b'\r' {
402 line_end = newline_pos - 1;
403 }
404
405 let to_append = &available[..line_end];
406
407 // Validate UTF-8 before appending
408 let line_str =
409 std::str::from_utf8(to_append).map_err(|e| StreamError::InvalidUtf8 {
410 line: self.line_number + 1,
411 error: e,
412 })?;
413
414 self.buffer.push_str(line_str);
415
416 // Consume bytes including newline
417 self.reader.consume(newline_pos + 1);
418
419 self.line_number += 1;
420 return Ok(Some((self.line_number, self.buffer.clone())));
421 } else {
422 // No newline yet, check if adding entire buffer exceeds limit
423 if self.buffer.len() + available.len() > self.max_line_length {
424 // CRITICAL: Consume all available data and skip to end of line
425 // to prevent infinite loop on subsequent reads
426 let accumulated = self.buffer.len() + available.len();
427 let consumed = available.len();
428 self.reader.consume(consumed);
429
430 // Continue reading and discarding until we find the end of line
431 self.skip_to_end_of_line().await?;
432
433 self.line_number += 1;
434 self.buffer.clear();
435 return Err(StreamError::LineTooLong {
436 line: self.line_number,
437 length: accumulated,
438 limit: self.max_line_length,
439 });
440 }
441
442 // Validate UTF-8 before appending
443 let chunk_str =
444 std::str::from_utf8(available).map_err(|e| StreamError::InvalidUtf8 {
445 line: self.line_number + 1,
446 error: e,
447 })?;
448
449 // Append entire buffer and continue reading
450 self.buffer.push_str(chunk_str);
451
452 let len = available.len();
453 self.reader.consume(len);
454 }
455 }
456 }
457
458 /// Skip to end of line when handling oversized line errors.
459 /// Consumes data until a newline is found or EOF is reached.
460 async fn skip_to_end_of_line(&mut self) -> StreamResult<()> {
461 loop {
462 let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
463
464 if available.is_empty() {
465 // EOF reached, line is done
466 return Ok(());
467 }
468
469 if let Some(newline_pos) = memchr(b'\n', available) {
470 // Found newline, consume up to and including it
471 self.reader.consume(newline_pos + 1);
472 return Ok(());
473 } else {
474 // No newline, consume all and continue
475 let len = available.len();
476 self.reader.consume(len);
477 }
478 }
479 }
480}
481
482#[cfg(all(test, feature = "async"))]
483mod tests {
484 use super::*;
485 use std::io::Cursor;
486
487 #[tokio::test]
488 async fn test_read_lines() {
489 let input = "line1\nline2\nline3";
490 let mut reader = AsyncLineReader::new(Cursor::new(input));
491
492 assert_eq!(
493 reader.next_line().await.unwrap(),
494 Some((1, "line1".to_string()))
495 );
496 assert_eq!(
497 reader.next_line().await.unwrap(),
498 Some((2, "line2".to_string()))
499 );
500 assert_eq!(
501 reader.next_line().await.unwrap(),
502 Some((3, "line3".to_string()))
503 );
504 assert_eq!(reader.next_line().await.unwrap(), None);
505 }
506
507 #[tokio::test]
508 async fn test_peek_and_push_back() {
509 let input = "line1\nline2";
510 let mut reader = AsyncLineReader::new(Cursor::new(input));
511
512 let peeked = reader.peek_line().await.unwrap().cloned();
513 assert_eq!(peeked, Some((1, "line1".to_string())));
514
515 // Should still return the same line
516 let line = reader.next_line().await.unwrap();
517 assert_eq!(line, Some((1, "line1".to_string())));
518
519 // Push back
520 reader.push_back(1, "line1".to_string());
521 let line = reader.next_line().await.unwrap();
522 assert_eq!(line, Some((1, "line1".to_string())));
523 }
524
525 #[tokio::test]
526 async fn test_empty_input() {
527 let input = "";
528 let mut reader = AsyncLineReader::new(Cursor::new(input));
529 assert_eq!(reader.next_line().await.unwrap(), None);
530 }
531
532 #[tokio::test]
533 async fn test_single_empty_line() {
534 let input = "\n";
535 let mut reader = AsyncLineReader::new(Cursor::new(input));
536 assert_eq!(reader.next_line().await.unwrap(), Some((1, String::new())));
537 assert_eq!(reader.next_line().await.unwrap(), None);
538 }
539
540 #[tokio::test]
541 async fn test_crlf_line_endings() {
542 let input = "line1\r\nline2\r\nline3";
543 let mut reader = AsyncLineReader::new(Cursor::new(input));
544 assert_eq!(
545 reader.next_line().await.unwrap(),
546 Some((1, "line1".to_string()))
547 );
548 assert_eq!(
549 reader.next_line().await.unwrap(),
550 Some((2, "line2".to_string()))
551 );
552 assert_eq!(
553 reader.next_line().await.unwrap(),
554 Some((3, "line3".to_string()))
555 );
556 }
557
558 #[tokio::test]
559 async fn test_mixed_line_endings() {
560 let input = "line1\nline2\r\nline3\nline4";
561 let mut reader = AsyncLineReader::new(Cursor::new(input));
562 assert_eq!(
563 reader.next_line().await.unwrap(),
564 Some((1, "line1".to_string()))
565 );
566 assert_eq!(
567 reader.next_line().await.unwrap(),
568 Some((2, "line2".to_string()))
569 );
570 assert_eq!(
571 reader.next_line().await.unwrap(),
572 Some((3, "line3".to_string()))
573 );
574 assert_eq!(
575 reader.next_line().await.unwrap(),
576 Some((4, "line4".to_string()))
577 );
578 }
579
580 #[tokio::test]
581 async fn test_line_number_tracking() {
582 let input = "line1\nline2\nline3";
583 let mut reader = AsyncLineReader::new(Cursor::new(input));
584
585 assert_eq!(reader.line_number(), 0);
586
587 reader.next_line().await.unwrap();
588 assert_eq!(reader.line_number(), 1);
589
590 reader.next_line().await.unwrap();
591 assert_eq!(reader.line_number(), 2);
592
593 reader.next_line().await.unwrap();
594 assert_eq!(reader.line_number(), 3);
595 }
596
597 #[tokio::test]
598 async fn test_peek_multiple_times() {
599 let input = "line1\nline2";
600 let mut reader = AsyncLineReader::new(Cursor::new(input));
601
602 // Peek multiple times should return the same line
603 assert_eq!(
604 reader.peek_line().await.unwrap(),
605 Some(&(1, "line1".to_string()))
606 );
607 assert_eq!(
608 reader.peek_line().await.unwrap(),
609 Some(&(1, "line1".to_string()))
610 );
611 assert_eq!(
612 reader.peek_line().await.unwrap(),
613 Some(&(1, "line1".to_string()))
614 );
615
616 // Consume it
617 reader.next_line().await.unwrap();
618
619 // Next peek should be the second line
620 assert_eq!(
621 reader.peek_line().await.unwrap(),
622 Some(&(2, "line2".to_string()))
623 );
624 }
625
626 #[tokio::test]
627 async fn test_with_capacity() {
628 let input = "line1\nline2";
629 let mut reader = AsyncLineReader::with_capacity(Cursor::new(input), 1024);
630
631 assert_eq!(
632 reader.next_line().await.unwrap(),
633 Some((1, "line1".to_string()))
634 );
635 assert_eq!(
636 reader.next_line().await.unwrap(),
637 Some((2, "line2".to_string()))
638 );
639 }
640
641 #[tokio::test]
642 async fn test_unicode_content() {
643 let input = "你好\n世界\n🎉";
644 let mut reader = AsyncLineReader::new(Cursor::new(input));
645
646 assert_eq!(
647 reader.next_line().await.unwrap(),
648 Some((1, "你好".to_string()))
649 );
650 assert_eq!(
651 reader.next_line().await.unwrap(),
652 Some((2, "世界".to_string()))
653 );
654 assert_eq!(
655 reader.next_line().await.unwrap(),
656 Some((3, "🎉".to_string()))
657 );
658 }
659
660 #[tokio::test]
661 async fn test_long_line() {
662 let long_line = "a".repeat(10000);
663 let mut reader = AsyncLineReader::new(Cursor::new(long_line.clone()));
664 assert_eq!(reader.next_line().await.unwrap(), Some((1, long_line)));
665 }
666
667 #[tokio::test]
668 async fn test_many_lines() {
669 let lines: Vec<String> = (0..1000).map(|i| format!("line{i}")).collect();
670 let input = lines.join("\n");
671 let mut reader = AsyncLineReader::new(Cursor::new(input));
672
673 for (i, expected) in lines.iter().enumerate() {
674 let result = reader.next_line().await.unwrap();
675 assert_eq!(result, Some((i + 1, expected.clone())));
676 }
677 assert_eq!(reader.next_line().await.unwrap(), None);
678 }
679
680 #[tokio::test]
681 async fn test_push_back_overwrites_peek() {
682 let input = "line1\nline2";
683 let mut reader = AsyncLineReader::new(Cursor::new(input));
684
685 reader.peek_line().await.unwrap(); // Peek line1
686 reader.push_back(42, "pushed".to_string());
687
688 let line = reader.next_line().await.unwrap();
689 assert_eq!(line, Some((42, "pushed".to_string())));
690 }
691}