hedl_stream/async_reader.rs
1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async line reader for streaming parser.
19//!
20//! Provides buffered async line-by-line reading with line number tracking, peek support,
21//! and the ability to push back lines for re-parsing.
22//!
23//! This module mirrors the synchronous [`LineReader`](crate::LineReader) but uses
24//! tokio's async I/O primitives for non-blocking operation.
25//!
26//! # Examples
27//!
28//! ## Basic Async Line Reading
29//!
30//! ```rust,no_run
31//! # #[cfg(feature = "async")]
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! use hedl_stream::AsyncLineReader;
34//! use tokio::io::AsyncReadExt;
35//! use std::io::Cursor;
36//!
37//! let input = "line1\nline2\nline3";
38//! let mut reader = AsyncLineReader::new(Cursor::new(input));
39//!
40//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
41//! assert_eq!(reader.next_line().await?, Some((2, "line2".to_string())));
42//! assert_eq!(reader.next_line().await?, Some((3, "line3".to_string())));
43//! assert_eq!(reader.next_line().await?, None);
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ## Peeking and Push Back
49//!
50//! ```rust,no_run
51//! # #[cfg(feature = "async")]
52//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53//! use hedl_stream::AsyncLineReader;
54//! use std::io::Cursor;
55//!
56//! let input = "line1\nline2";
57//! let mut reader = AsyncLineReader::new(Cursor::new(input));
58//!
59//! // Peek without consuming
60//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
61//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
62//!
63//! // Now consume it
64//! let line = reader.next_line().await?.unwrap();
65//! assert_eq!(line, (1, "line1".to_string()));
66//!
67//! // Push it back
68//! reader.push_back(line.0, line.1);
69//!
70//! // Read it again
71//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
72//! # Ok(())
73//! # }
74//! ```
75
76use crate::error::{StreamError, StreamResult};
77use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
78
79/// Simple memchr implementation - finds the first occurrence of a byte in a slice.
80fn memchr_byte(needle: u8, haystack: &[u8]) -> Option<usize> {
81 haystack.iter().position(|&b| b == needle)
82}
83
84/// Buffered async line reader with line number tracking.
85///
86/// Reads input line-by-line asynchronously, automatically handling different line endings
87/// (LF, CRLF) and tracking the current line number for error reporting.
88///
89/// # Performance Characteristics
90///
91/// - **Buffering**: Configurable buffer size (default 64KB) for efficient I/O
92/// - **Zero-Copy**: String allocations only for consumed lines
93/// - **Async**: Non-blocking I/O suitable for high-concurrency scenarios
94///
95/// # When to Use Async vs Sync
96///
97/// **Use Async When:**
98/// - Processing network streams or pipes
99/// - High-concurrency scenarios (many parallel streams)
100/// - Integration with async web servers or frameworks
101/// - Need to process I/O without blocking threads
102///
103/// **Use Sync When:**
104/// - Processing local files
105/// - Single-threaded batch processing
106/// - Simpler code without async complexity
107/// - CPU-bound workloads with minimal I/O wait
108///
109/// # Examples
110///
111/// ## Reading from Async Source
112///
113/// ```rust,no_run
114/// # #[cfg(feature = "async")]
115/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
116/// use hedl_stream::AsyncLineReader;
117/// use tokio::fs::File;
118///
119/// let file = File::open("data.hedl").await?;
120/// let mut reader = AsyncLineReader::new(file);
121///
122/// while let Some((line_num, line)) = reader.next_line().await? {
123/// println!("{}: {}", line_num, line);
124/// }
125/// # Ok(())
126/// # }
127/// ```
128///
129/// ## With Custom Buffer Size
130///
131/// ```rust
132/// # #[cfg(feature = "async")]
133/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
134/// use hedl_stream::AsyncLineReader;
135/// use std::io::Cursor;
136///
137/// let input = "line1\nline2";
138/// let reader = AsyncLineReader::with_capacity(Cursor::new(input), 256 * 1024);
139/// # Ok(())
140/// # }
141/// ```
142pub struct AsyncLineReader<R: AsyncRead + Unpin> {
143 reader: BufReader<R>,
144 line_number: usize,
145 buffer: String,
146 peeked: Option<(usize, String)>,
147 max_line_length: usize,
148}
149
150impl<R: AsyncRead + Unpin> AsyncLineReader<R> {
151 /// Create a new async line reader with default buffer size (64KB) and max line length (1MB).
152 ///
153 /// # Examples
154 ///
155 /// ```rust
156 /// # #[cfg(feature = "async")]
157 /// use hedl_stream::AsyncLineReader;
158 /// use std::io::Cursor;
159 ///
160 /// let input = "line1\nline2";
161 /// let reader = AsyncLineReader::new(Cursor::new(input));
162 /// ```
163 pub fn new(reader: R) -> Self {
164 Self {
165 reader: BufReader::new(reader),
166 line_number: 0,
167 buffer: String::new(),
168 peeked: None,
169 max_line_length: 1_000_000,
170 }
171 }
172
173 /// Create an async line reader with a specific buffer capacity and default max line length (1MB).
174 ///
175 /// # Parameters
176 ///
177 /// - `reader`: The async readable source
178 /// - `capacity`: Buffer size in bytes
179 ///
180 /// # Examples
181 ///
182 /// ```rust
183 /// # #[cfg(feature = "async")]
184 /// use hedl_stream::AsyncLineReader;
185 /// use std::io::Cursor;
186 ///
187 /// // Use a larger buffer for large files
188 /// let reader = AsyncLineReader::with_capacity(
189 /// Cursor::new("data"),
190 /// 256 * 1024 // 256KB
191 /// );
192 /// ```
193 pub fn with_capacity(reader: R, capacity: usize) -> Self {
194 Self {
195 reader: BufReader::with_capacity(capacity, reader),
196 line_number: 0,
197 buffer: String::new(),
198 peeked: None,
199 max_line_length: 1_000_000,
200 }
201 }
202
203 /// Create with a specific max line length.
204 pub fn with_max_length(reader: R, max_line_length: usize) -> Self {
205 Self {
206 reader: BufReader::new(reader),
207 line_number: 0,
208 buffer: String::new(),
209 peeked: None,
210 max_line_length,
211 }
212 }
213
214 /// Create with a specific buffer capacity and max line length.
215 pub fn with_capacity_and_max_length(
216 reader: R,
217 capacity: usize,
218 max_line_length: usize,
219 ) -> Self {
220 Self {
221 reader: BufReader::with_capacity(capacity, reader),
222 line_number: 0,
223 buffer: String::new(),
224 peeked: None,
225 max_line_length,
226 }
227 }
228
229 /// Get the current line number.
230 ///
231 /// Returns 0 before any lines are read, then increments with each line.
232 ///
233 /// # Examples
234 ///
235 /// ```rust
236 /// # #[cfg(feature = "async")]
237 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
238 /// use hedl_stream::AsyncLineReader;
239 /// use std::io::Cursor;
240 ///
241 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
242 ///
243 /// assert_eq!(reader.line_number(), 0);
244 /// reader.next_line().await?;
245 /// assert_eq!(reader.line_number(), 1);
246 /// reader.next_line().await?;
247 /// assert_eq!(reader.line_number(), 2);
248 /// # Ok(())
249 /// # }
250 /// ```
251 #[inline]
252 pub fn line_number(&self) -> usize {
253 self.line_number
254 }
255
256 /// Read the next line asynchronously.
257 ///
258 /// Returns `Ok(Some((line_num, line)))` if a line was read, `Ok(None)` at EOF,
259 /// or `Err` on I/O errors.
260 ///
261 /// Trailing newlines (LF or CRLF) are automatically stripped.
262 ///
263 /// # Performance
264 ///
265 /// This method awaits on I/O and yields to the runtime if data is not available,
266 /// allowing other tasks to run. It does not block the thread.
267 ///
268 /// # Examples
269 ///
270 /// ```rust
271 /// # #[cfg(feature = "async")]
272 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
273 /// use hedl_stream::AsyncLineReader;
274 /// use std::io::Cursor;
275 ///
276 /// let mut reader = AsyncLineReader::new(Cursor::new("hello\nworld"));
277 ///
278 /// let (num, line) = reader.next_line().await?.unwrap();
279 /// assert_eq!(num, 1);
280 /// assert_eq!(line, "hello");
281 ///
282 /// let (num, line) = reader.next_line().await?.unwrap();
283 /// assert_eq!(num, 2);
284 /// assert_eq!(line, "world");
285 ///
286 /// assert_eq!(reader.next_line().await?, None);
287 /// # Ok(())
288 /// # }
289 /// ```
290 pub async fn next_line(&mut self) -> StreamResult<Option<(usize, String)>> {
291 // Return peeked line if available
292 if let Some(peeked) = self.peeked.take() {
293 return Ok(Some(peeked));
294 }
295
296 self.read_line_internal().await
297 }
298
299 /// Peek at the next line without consuming it.
300 ///
301 /// Returns a reference to the next line without advancing the reader.
302 /// Subsequent calls to `peek_line()` return the same line. Call `next_line()`
303 /// to consume it.
304 ///
305 /// # Examples
306 ///
307 /// ```rust
308 /// # #[cfg(feature = "async")]
309 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
310 /// use hedl_stream::AsyncLineReader;
311 /// use std::io::Cursor;
312 ///
313 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
314 ///
315 /// // Peek multiple times
316 /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
317 /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
318 ///
319 /// // Consume
320 /// reader.next_line().await?;
321 ///
322 /// // Next peek is the second line
323 /// assert_eq!(reader.peek_line().await?, Some(&(2, "line2".to_string())));
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub async fn peek_line(&mut self) -> StreamResult<Option<&(usize, String)>> {
328 if self.peeked.is_none() {
329 self.peeked = self.read_line_internal().await?;
330 }
331 Ok(self.peeked.as_ref())
332 }
333
334 /// Push a line back to be read again.
335 ///
336 /// The next call to `next_line()` or `peek_line()` will return this line.
337 /// Only one line can be pushed back at a time; subsequent calls overwrite
338 /// the previously pushed line.
339 ///
340 /// # Examples
341 ///
342 /// ```rust
343 /// # #[cfg(feature = "async")]
344 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
345 /// use hedl_stream::AsyncLineReader;
346 /// use std::io::Cursor;
347 ///
348 /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
349 ///
350 /// let line = reader.next_line().await?.unwrap();
351 /// assert_eq!(line, (1, "line1".to_string()));
352 ///
353 /// // Push it back
354 /// reader.push_back(line.0, line.1);
355 ///
356 /// // Read it again
357 /// let line = reader.next_line().await?.unwrap();
358 /// assert_eq!(line, (1, "line1".to_string()));
359 /// # Ok(())
360 /// # }
361 /// ```
362 #[inline]
363 pub fn push_back(&mut self, line_num: usize, line: String) {
364 self.peeked = Some((line_num, line));
365 }
366
367 async fn read_line_internal(&mut self) -> StreamResult<Option<(usize, String)>> {
368 self.buffer.clear();
369
370 loop {
371 // Read from BufReader's internal buffer (zero-copy)
372 let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
373
374 if available.is_empty() {
375 // EOF
376 if self.buffer.is_empty() {
377 return Ok(None);
378 }
379 // Return partial line (no trailing newline)
380 self.line_number += 1;
381 return Ok(Some((self.line_number, self.buffer.clone())));
382 }
383
384 // Find newline in available data
385 if let Some(newline_pos) = memchr_byte(b'\n', available) {
386 // Check limit BEFORE appending
387 if self.buffer.len() + newline_pos > self.max_line_length {
388 // CRITICAL: Consume the oversized line data to prevent infinite loop
389 // Consume up to and including the newline character
390 self.reader.consume(newline_pos + 1);
391 let total_length = self.buffer.len() + newline_pos;
392 self.line_number += 1;
393 self.buffer.clear();
394 return Err(StreamError::LineTooLong {
395 line: self.line_number,
396 length: total_length,
397 limit: self.max_line_length,
398 });
399 }
400
401 // Append up to newline (excluding the newline itself)
402 let mut line_end = newline_pos;
403
404 // Handle CRLF: if newline is preceded by CR, exclude it too
405 if newline_pos > 0 && available[newline_pos - 1] == b'\r' {
406 line_end = newline_pos - 1;
407 }
408
409 let to_append = &available[..line_end];
410
411 // Validate UTF-8 before appending
412 let line_str =
413 std::str::from_utf8(to_append).map_err(|e| StreamError::InvalidUtf8 {
414 line: self.line_number + 1,
415 error: e,
416 })?;
417
418 self.buffer.push_str(line_str);
419
420 // Consume bytes including newline
421 self.reader.consume(newline_pos + 1);
422
423 self.line_number += 1;
424 return Ok(Some((self.line_number, self.buffer.clone())));
425 } else {
426 // No newline yet, check if adding entire buffer exceeds limit
427 if self.buffer.len() + available.len() > self.max_line_length {
428 // CRITICAL: Consume all available data and skip to end of line
429 // to prevent infinite loop on subsequent reads
430 let accumulated = self.buffer.len() + available.len();
431 let consumed = available.len();
432 self.reader.consume(consumed);
433
434 // Continue reading and discarding until we find the end of line
435 self.skip_to_end_of_line().await?;
436
437 self.line_number += 1;
438 self.buffer.clear();
439 return Err(StreamError::LineTooLong {
440 line: self.line_number,
441 length: accumulated,
442 limit: self.max_line_length,
443 });
444 }
445
446 // Validate UTF-8 before appending
447 let chunk_str =
448 std::str::from_utf8(available).map_err(|e| StreamError::InvalidUtf8 {
449 line: self.line_number + 1,
450 error: e,
451 })?;
452
453 // Append entire buffer and continue reading
454 self.buffer.push_str(chunk_str);
455
456 let len = available.len();
457 self.reader.consume(len);
458 }
459 }
460 }
461
462 /// Skip to end of line when handling oversized line errors.
463 /// Consumes data until a newline is found or EOF is reached.
464 async fn skip_to_end_of_line(&mut self) -> StreamResult<()> {
465 loop {
466 let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
467
468 if available.is_empty() {
469 // EOF reached, line is done
470 return Ok(());
471 }
472
473 if let Some(newline_pos) = memchr_byte(b'\n', available) {
474 // Found newline, consume up to and including it
475 self.reader.consume(newline_pos + 1);
476 return Ok(());
477 } else {
478 // No newline, consume all and continue
479 let len = available.len();
480 self.reader.consume(len);
481 }
482 }
483 }
484}
485
486#[cfg(all(test, feature = "async"))]
487mod tests {
488 use super::*;
489 use std::io::Cursor;
490
491 #[tokio::test]
492 async fn test_read_lines() {
493 let input = "line1\nline2\nline3";
494 let mut reader = AsyncLineReader::new(Cursor::new(input));
495
496 assert_eq!(
497 reader.next_line().await.unwrap(),
498 Some((1, "line1".to_string()))
499 );
500 assert_eq!(
501 reader.next_line().await.unwrap(),
502 Some((2, "line2".to_string()))
503 );
504 assert_eq!(
505 reader.next_line().await.unwrap(),
506 Some((3, "line3".to_string()))
507 );
508 assert_eq!(reader.next_line().await.unwrap(), None);
509 }
510
511 #[tokio::test]
512 async fn test_peek_and_push_back() {
513 let input = "line1\nline2";
514 let mut reader = AsyncLineReader::new(Cursor::new(input));
515
516 let peeked = reader.peek_line().await.unwrap().cloned();
517 assert_eq!(peeked, Some((1, "line1".to_string())));
518
519 // Should still return the same line
520 let line = reader.next_line().await.unwrap();
521 assert_eq!(line, Some((1, "line1".to_string())));
522
523 // Push back
524 reader.push_back(1, "line1".to_string());
525 let line = reader.next_line().await.unwrap();
526 assert_eq!(line, Some((1, "line1".to_string())));
527 }
528
529 #[tokio::test]
530 async fn test_empty_input() {
531 let input = "";
532 let mut reader = AsyncLineReader::new(Cursor::new(input));
533 assert_eq!(reader.next_line().await.unwrap(), None);
534 }
535
536 #[tokio::test]
537 async fn test_single_empty_line() {
538 let input = "\n";
539 let mut reader = AsyncLineReader::new(Cursor::new(input));
540 assert_eq!(reader.next_line().await.unwrap(), Some((1, String::new())));
541 assert_eq!(reader.next_line().await.unwrap(), None);
542 }
543
544 #[tokio::test]
545 async fn test_crlf_line_endings() {
546 let input = "line1\r\nline2\r\nline3";
547 let mut reader = AsyncLineReader::new(Cursor::new(input));
548 assert_eq!(
549 reader.next_line().await.unwrap(),
550 Some((1, "line1".to_string()))
551 );
552 assert_eq!(
553 reader.next_line().await.unwrap(),
554 Some((2, "line2".to_string()))
555 );
556 assert_eq!(
557 reader.next_line().await.unwrap(),
558 Some((3, "line3".to_string()))
559 );
560 }
561
562 #[tokio::test]
563 async fn test_mixed_line_endings() {
564 let input = "line1\nline2\r\nline3\nline4";
565 let mut reader = AsyncLineReader::new(Cursor::new(input));
566 assert_eq!(
567 reader.next_line().await.unwrap(),
568 Some((1, "line1".to_string()))
569 );
570 assert_eq!(
571 reader.next_line().await.unwrap(),
572 Some((2, "line2".to_string()))
573 );
574 assert_eq!(
575 reader.next_line().await.unwrap(),
576 Some((3, "line3".to_string()))
577 );
578 assert_eq!(
579 reader.next_line().await.unwrap(),
580 Some((4, "line4".to_string()))
581 );
582 }
583
584 #[tokio::test]
585 async fn test_line_number_tracking() {
586 let input = "line1\nline2\nline3";
587 let mut reader = AsyncLineReader::new(Cursor::new(input));
588
589 assert_eq!(reader.line_number(), 0);
590
591 reader.next_line().await.unwrap();
592 assert_eq!(reader.line_number(), 1);
593
594 reader.next_line().await.unwrap();
595 assert_eq!(reader.line_number(), 2);
596
597 reader.next_line().await.unwrap();
598 assert_eq!(reader.line_number(), 3);
599 }
600
601 #[tokio::test]
602 async fn test_peek_multiple_times() {
603 let input = "line1\nline2";
604 let mut reader = AsyncLineReader::new(Cursor::new(input));
605
606 // Peek multiple times should return the same line
607 assert_eq!(
608 reader.peek_line().await.unwrap(),
609 Some(&(1, "line1".to_string()))
610 );
611 assert_eq!(
612 reader.peek_line().await.unwrap(),
613 Some(&(1, "line1".to_string()))
614 );
615 assert_eq!(
616 reader.peek_line().await.unwrap(),
617 Some(&(1, "line1".to_string()))
618 );
619
620 // Consume it
621 reader.next_line().await.unwrap();
622
623 // Next peek should be the second line
624 assert_eq!(
625 reader.peek_line().await.unwrap(),
626 Some(&(2, "line2".to_string()))
627 );
628 }
629
630 #[tokio::test]
631 async fn test_with_capacity() {
632 let input = "line1\nline2";
633 let mut reader = AsyncLineReader::with_capacity(Cursor::new(input), 1024);
634
635 assert_eq!(
636 reader.next_line().await.unwrap(),
637 Some((1, "line1".to_string()))
638 );
639 assert_eq!(
640 reader.next_line().await.unwrap(),
641 Some((2, "line2".to_string()))
642 );
643 }
644
645 #[tokio::test]
646 async fn test_unicode_content() {
647 let input = "你好\n世界\n🎉";
648 let mut reader = AsyncLineReader::new(Cursor::new(input));
649
650 assert_eq!(
651 reader.next_line().await.unwrap(),
652 Some((1, "你好".to_string()))
653 );
654 assert_eq!(
655 reader.next_line().await.unwrap(),
656 Some((2, "世界".to_string()))
657 );
658 assert_eq!(
659 reader.next_line().await.unwrap(),
660 Some((3, "🎉".to_string()))
661 );
662 }
663
664 #[tokio::test]
665 async fn test_long_line() {
666 let long_line = "a".repeat(10000);
667 let mut reader = AsyncLineReader::new(Cursor::new(long_line.clone()));
668 assert_eq!(reader.next_line().await.unwrap(), Some((1, long_line)));
669 }
670
671 #[tokio::test]
672 async fn test_many_lines() {
673 let lines: Vec<String> = (0..1000).map(|i| format!("line{i}")).collect();
674 let input = lines.join("\n");
675 let mut reader = AsyncLineReader::new(Cursor::new(input));
676
677 for (i, expected) in lines.iter().enumerate() {
678 let result = reader.next_line().await.unwrap();
679 assert_eq!(result, Some((i + 1, expected.clone())));
680 }
681 assert_eq!(reader.next_line().await.unwrap(), None);
682 }
683
684 #[tokio::test]
685 async fn test_push_back_overwrites_peek() {
686 let input = "line1\nline2";
687 let mut reader = AsyncLineReader::new(Cursor::new(input));
688
689 reader.peek_line().await.unwrap(); // Peek line1
690 reader.push_back(42, "pushed".to_string());
691
692 let line = reader.next_line().await.unwrap();
693 assert_eq!(line, Some((42, "pushed".to_string())));
694 }
695}