hedl_stream/
async_reader.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async line reader for streaming parser.
19//!
20//! Provides buffered async line-by-line reading with line number tracking, peek support,
21//! and the ability to push back lines for re-parsing.
22//!
23//! This module mirrors the synchronous [`LineReader`](crate::LineReader) but uses
24//! tokio's async I/O primitives for non-blocking operation.
25//!
26//! # Examples
27//!
28//! ## Basic Async Line Reading
29//!
30//! ```rust,no_run
31//! # #[cfg(feature = "async")]
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! use hedl_stream::AsyncLineReader;
34//! use tokio::io::AsyncReadExt;
35//! use std::io::Cursor;
36//!
37//! let input = "line1\nline2\nline3";
38//! let mut reader = AsyncLineReader::new(Cursor::new(input));
39//!
40//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
41//! assert_eq!(reader.next_line().await?, Some((2, "line2".to_string())));
42//! assert_eq!(reader.next_line().await?, Some((3, "line3".to_string())));
43//! assert_eq!(reader.next_line().await?, None);
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ## Peeking and Push Back
49//!
50//! ```rust,no_run
51//! # #[cfg(feature = "async")]
52//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53//! use hedl_stream::AsyncLineReader;
54//! use std::io::Cursor;
55//!
56//! let input = "line1\nline2";
57//! let mut reader = AsyncLineReader::new(Cursor::new(input));
58//!
59//! // Peek without consuming
60//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
61//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
62//!
63//! // Now consume it
64//! let line = reader.next_line().await?.unwrap();
65//! assert_eq!(line, (1, "line1".to_string()));
66//!
67//! // Push it back
68//! reader.push_back(line.0, line.1);
69//!
70//! // Read it again
71//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
72//! # Ok(())
73//! # }
74//! ```
75
76use crate::error::{StreamError, StreamResult};
77use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
78
79/// Simple memchr implementation - finds the first occurrence of a byte in a slice.
80fn memchr_byte(needle: u8, haystack: &[u8]) -> Option<usize> {
81    haystack.iter().position(|&b| b == needle)
82}
83
84/// Buffered async line reader with line number tracking.
85///
86/// Reads input line-by-line asynchronously, automatically handling different line endings
87/// (LF, CRLF) and tracking the current line number for error reporting.
88///
89/// # Performance Characteristics
90///
91/// - **Buffering**: Configurable buffer size (default 64KB) for efficient I/O
92/// - **Zero-Copy**: String allocations only for consumed lines
93/// - **Async**: Non-blocking I/O suitable for high-concurrency scenarios
94///
95/// # When to Use Async vs Sync
96///
97/// **Use Async When:**
98/// - Processing network streams or pipes
99/// - High-concurrency scenarios (many parallel streams)
100/// - Integration with async web servers or frameworks
101/// - Need to process I/O without blocking threads
102///
103/// **Use Sync When:**
104/// - Processing local files
105/// - Single-threaded batch processing
106/// - Simpler code without async complexity
107/// - CPU-bound workloads with minimal I/O wait
108///
109/// # Examples
110///
111/// ## Reading from Async Source
112///
113/// ```rust,no_run
114/// # #[cfg(feature = "async")]
115/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
116/// use hedl_stream::AsyncLineReader;
117/// use tokio::fs::File;
118///
119/// let file = File::open("data.hedl").await?;
120/// let mut reader = AsyncLineReader::new(file);
121///
122/// while let Some((line_num, line)) = reader.next_line().await? {
123///     println!("{}: {}", line_num, line);
124/// }
125/// # Ok(())
126/// # }
127/// ```
128///
129/// ## With Custom Buffer Size
130///
131/// ```rust
132/// # #[cfg(feature = "async")]
133/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
134/// use hedl_stream::AsyncLineReader;
135/// use std::io::Cursor;
136///
137/// let input = "line1\nline2";
138/// let reader = AsyncLineReader::with_capacity(Cursor::new(input), 256 * 1024);
139/// # Ok(())
140/// # }
141/// ```
142pub struct AsyncLineReader<R: AsyncRead + Unpin> {
143    reader: BufReader<R>,
144    line_number: usize,
145    buffer: String,
146    peeked: Option<(usize, String)>,
147    max_line_length: usize,
148}
149
150impl<R: AsyncRead + Unpin> AsyncLineReader<R> {
151    /// Create a new async line reader with default buffer size (64KB) and max line length (1MB).
152    ///
153    /// # Examples
154    ///
155    /// ```rust
156    /// # #[cfg(feature = "async")]
157    /// use hedl_stream::AsyncLineReader;
158    /// use std::io::Cursor;
159    ///
160    /// let input = "line1\nline2";
161    /// let reader = AsyncLineReader::new(Cursor::new(input));
162    /// ```
163    pub fn new(reader: R) -> Self {
164        Self {
165            reader: BufReader::new(reader),
166            line_number: 0,
167            buffer: String::new(),
168            peeked: None,
169            max_line_length: 1_000_000,
170        }
171    }
172
173    /// Create an async line reader with a specific buffer capacity and default max line length (1MB).
174    ///
175    /// # Parameters
176    ///
177    /// - `reader`: The async readable source
178    /// - `capacity`: Buffer size in bytes
179    ///
180    /// # Examples
181    ///
182    /// ```rust
183    /// # #[cfg(feature = "async")]
184    /// use hedl_stream::AsyncLineReader;
185    /// use std::io::Cursor;
186    ///
187    /// // Use a larger buffer for large files
188    /// let reader = AsyncLineReader::with_capacity(
189    ///     Cursor::new("data"),
190    ///     256 * 1024  // 256KB
191    /// );
192    /// ```
193    pub fn with_capacity(reader: R, capacity: usize) -> Self {
194        Self {
195            reader: BufReader::with_capacity(capacity, reader),
196            line_number: 0,
197            buffer: String::new(),
198            peeked: None,
199            max_line_length: 1_000_000,
200        }
201    }
202
203    /// Create with a specific max line length.
204    pub fn with_max_length(reader: R, max_line_length: usize) -> Self {
205        Self {
206            reader: BufReader::new(reader),
207            line_number: 0,
208            buffer: String::new(),
209            peeked: None,
210            max_line_length,
211        }
212    }
213
214    /// Create with a specific buffer capacity and max line length.
215    pub fn with_capacity_and_max_length(
216        reader: R,
217        capacity: usize,
218        max_line_length: usize,
219    ) -> Self {
220        Self {
221            reader: BufReader::with_capacity(capacity, reader),
222            line_number: 0,
223            buffer: String::new(),
224            peeked: None,
225            max_line_length,
226        }
227    }
228
229    /// Get the current line number.
230    ///
231    /// Returns 0 before any lines are read, then increments with each line.
232    ///
233    /// # Examples
234    ///
235    /// ```rust
236    /// # #[cfg(feature = "async")]
237    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
238    /// use hedl_stream::AsyncLineReader;
239    /// use std::io::Cursor;
240    ///
241    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
242    ///
243    /// assert_eq!(reader.line_number(), 0);
244    /// reader.next_line().await?;
245    /// assert_eq!(reader.line_number(), 1);
246    /// reader.next_line().await?;
247    /// assert_eq!(reader.line_number(), 2);
248    /// # Ok(())
249    /// # }
250    /// ```
251    #[inline]
252    pub fn line_number(&self) -> usize {
253        self.line_number
254    }
255
256    /// Read the next line asynchronously.
257    ///
258    /// Returns `Ok(Some((line_num, line)))` if a line was read, `Ok(None)` at EOF,
259    /// or `Err` on I/O errors.
260    ///
261    /// Trailing newlines (LF or CRLF) are automatically stripped.
262    ///
263    /// # Performance
264    ///
265    /// This method awaits on I/O and yields to the runtime if data is not available,
266    /// allowing other tasks to run. It does not block the thread.
267    ///
268    /// # Examples
269    ///
270    /// ```rust
271    /// # #[cfg(feature = "async")]
272    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
273    /// use hedl_stream::AsyncLineReader;
274    /// use std::io::Cursor;
275    ///
276    /// let mut reader = AsyncLineReader::new(Cursor::new("hello\nworld"));
277    ///
278    /// let (num, line) = reader.next_line().await?.unwrap();
279    /// assert_eq!(num, 1);
280    /// assert_eq!(line, "hello");
281    ///
282    /// let (num, line) = reader.next_line().await?.unwrap();
283    /// assert_eq!(num, 2);
284    /// assert_eq!(line, "world");
285    ///
286    /// assert_eq!(reader.next_line().await?, None);
287    /// # Ok(())
288    /// # }
289    /// ```
290    pub async fn next_line(&mut self) -> StreamResult<Option<(usize, String)>> {
291        // Return peeked line if available
292        if let Some(peeked) = self.peeked.take() {
293            return Ok(Some(peeked));
294        }
295
296        self.read_line_internal().await
297    }
298
299    /// Peek at the next line without consuming it.
300    ///
301    /// Returns a reference to the next line without advancing the reader.
302    /// Subsequent calls to `peek_line()` return the same line. Call `next_line()`
303    /// to consume it.
304    ///
305    /// # Examples
306    ///
307    /// ```rust
308    /// # #[cfg(feature = "async")]
309    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
310    /// use hedl_stream::AsyncLineReader;
311    /// use std::io::Cursor;
312    ///
313    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
314    ///
315    /// // Peek multiple times
316    /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
317    /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
318    ///
319    /// // Consume
320    /// reader.next_line().await?;
321    ///
322    /// // Next peek is the second line
323    /// assert_eq!(reader.peek_line().await?, Some(&(2, "line2".to_string())));
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub async fn peek_line(&mut self) -> StreamResult<Option<&(usize, String)>> {
328        if self.peeked.is_none() {
329            self.peeked = self.read_line_internal().await?;
330        }
331        Ok(self.peeked.as_ref())
332    }
333
334    /// Push a line back to be read again.
335    ///
336    /// The next call to `next_line()` or `peek_line()` will return this line.
337    /// Only one line can be pushed back at a time; subsequent calls overwrite
338    /// the previously pushed line.
339    ///
340    /// # Examples
341    ///
342    /// ```rust
343    /// # #[cfg(feature = "async")]
344    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
345    /// use hedl_stream::AsyncLineReader;
346    /// use std::io::Cursor;
347    ///
348    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
349    ///
350    /// let line = reader.next_line().await?.unwrap();
351    /// assert_eq!(line, (1, "line1".to_string()));
352    ///
353    /// // Push it back
354    /// reader.push_back(line.0, line.1);
355    ///
356    /// // Read it again
357    /// let line = reader.next_line().await?.unwrap();
358    /// assert_eq!(line, (1, "line1".to_string()));
359    /// # Ok(())
360    /// # }
361    /// ```
362    #[inline]
363    pub fn push_back(&mut self, line_num: usize, line: String) {
364        self.peeked = Some((line_num, line));
365    }
366
367    async fn read_line_internal(&mut self) -> StreamResult<Option<(usize, String)>> {
368        self.buffer.clear();
369
370        loop {
371            // Read from BufReader's internal buffer (zero-copy)
372            let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
373
374            if available.is_empty() {
375                // EOF
376                if self.buffer.is_empty() {
377                    return Ok(None);
378                }
379                // Return partial line (no trailing newline)
380                self.line_number += 1;
381                return Ok(Some((self.line_number, self.buffer.clone())));
382            }
383
384            // Find newline in available data
385            if let Some(newline_pos) = memchr_byte(b'\n', available) {
386                // Check limit BEFORE appending
387                if self.buffer.len() + newline_pos > self.max_line_length {
388                    // CRITICAL: Consume the oversized line data to prevent infinite loop
389                    // Consume up to and including the newline character
390                    self.reader.consume(newline_pos + 1);
391                    let total_length = self.buffer.len() + newline_pos;
392                    self.line_number += 1;
393                    self.buffer.clear();
394                    return Err(StreamError::LineTooLong {
395                        line: self.line_number,
396                        length: total_length,
397                        limit: self.max_line_length,
398                    });
399                }
400
401                // Append up to newline (excluding the newline itself)
402                let mut line_end = newline_pos;
403
404                // Handle CRLF: if newline is preceded by CR, exclude it too
405                if newline_pos > 0 && available[newline_pos - 1] == b'\r' {
406                    line_end = newline_pos - 1;
407                }
408
409                let to_append = &available[..line_end];
410
411                // Validate UTF-8 before appending
412                let line_str =
413                    std::str::from_utf8(to_append).map_err(|e| StreamError::InvalidUtf8 {
414                        line: self.line_number + 1,
415                        error: e,
416                    })?;
417
418                self.buffer.push_str(line_str);
419
420                // Consume bytes including newline
421                self.reader.consume(newline_pos + 1);
422
423                self.line_number += 1;
424                return Ok(Some((self.line_number, self.buffer.clone())));
425            } else {
426                // No newline yet, check if adding entire buffer exceeds limit
427                if self.buffer.len() + available.len() > self.max_line_length {
428                    // CRITICAL: Consume all available data and skip to end of line
429                    // to prevent infinite loop on subsequent reads
430                    let accumulated = self.buffer.len() + available.len();
431                    let consumed = available.len();
432                    self.reader.consume(consumed);
433
434                    // Continue reading and discarding until we find the end of line
435                    self.skip_to_end_of_line().await?;
436
437                    self.line_number += 1;
438                    self.buffer.clear();
439                    return Err(StreamError::LineTooLong {
440                        line: self.line_number,
441                        length: accumulated,
442                        limit: self.max_line_length,
443                    });
444                }
445
446                // Validate UTF-8 before appending
447                let chunk_str =
448                    std::str::from_utf8(available).map_err(|e| StreamError::InvalidUtf8 {
449                        line: self.line_number + 1,
450                        error: e,
451                    })?;
452
453                // Append entire buffer and continue reading
454                self.buffer.push_str(chunk_str);
455
456                let len = available.len();
457                self.reader.consume(len);
458            }
459        }
460    }
461
462    /// Skip to end of line when handling oversized line errors.
463    /// Consumes data until a newline is found or EOF is reached.
464    async fn skip_to_end_of_line(&mut self) -> StreamResult<()> {
465        loop {
466            let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
467
468            if available.is_empty() {
469                // EOF reached, line is done
470                return Ok(());
471            }
472
473            if let Some(newline_pos) = memchr_byte(b'\n', available) {
474                // Found newline, consume up to and including it
475                self.reader.consume(newline_pos + 1);
476                return Ok(());
477            } else {
478                // No newline, consume all and continue
479                let len = available.len();
480                self.reader.consume(len);
481            }
482        }
483    }
484}
485
486#[cfg(all(test, feature = "async"))]
487mod tests {
488    use super::*;
489    use std::io::Cursor;
490
491    #[tokio::test]
492    async fn test_read_lines() {
493        let input = "line1\nline2\nline3";
494        let mut reader = AsyncLineReader::new(Cursor::new(input));
495
496        assert_eq!(
497            reader.next_line().await.unwrap(),
498            Some((1, "line1".to_string()))
499        );
500        assert_eq!(
501            reader.next_line().await.unwrap(),
502            Some((2, "line2".to_string()))
503        );
504        assert_eq!(
505            reader.next_line().await.unwrap(),
506            Some((3, "line3".to_string()))
507        );
508        assert_eq!(reader.next_line().await.unwrap(), None);
509    }
510
511    #[tokio::test]
512    async fn test_peek_and_push_back() {
513        let input = "line1\nline2";
514        let mut reader = AsyncLineReader::new(Cursor::new(input));
515
516        let peeked = reader.peek_line().await.unwrap().cloned();
517        assert_eq!(peeked, Some((1, "line1".to_string())));
518
519        // Should still return the same line
520        let line = reader.next_line().await.unwrap();
521        assert_eq!(line, Some((1, "line1".to_string())));
522
523        // Push back
524        reader.push_back(1, "line1".to_string());
525        let line = reader.next_line().await.unwrap();
526        assert_eq!(line, Some((1, "line1".to_string())));
527    }
528
529    #[tokio::test]
530    async fn test_empty_input() {
531        let input = "";
532        let mut reader = AsyncLineReader::new(Cursor::new(input));
533        assert_eq!(reader.next_line().await.unwrap(), None);
534    }
535
536    #[tokio::test]
537    async fn test_single_empty_line() {
538        let input = "\n";
539        let mut reader = AsyncLineReader::new(Cursor::new(input));
540        assert_eq!(reader.next_line().await.unwrap(), Some((1, String::new())));
541        assert_eq!(reader.next_line().await.unwrap(), None);
542    }
543
544    #[tokio::test]
545    async fn test_crlf_line_endings() {
546        let input = "line1\r\nline2\r\nline3";
547        let mut reader = AsyncLineReader::new(Cursor::new(input));
548        assert_eq!(
549            reader.next_line().await.unwrap(),
550            Some((1, "line1".to_string()))
551        );
552        assert_eq!(
553            reader.next_line().await.unwrap(),
554            Some((2, "line2".to_string()))
555        );
556        assert_eq!(
557            reader.next_line().await.unwrap(),
558            Some((3, "line3".to_string()))
559        );
560    }
561
562    #[tokio::test]
563    async fn test_mixed_line_endings() {
564        let input = "line1\nline2\r\nline3\nline4";
565        let mut reader = AsyncLineReader::new(Cursor::new(input));
566        assert_eq!(
567            reader.next_line().await.unwrap(),
568            Some((1, "line1".to_string()))
569        );
570        assert_eq!(
571            reader.next_line().await.unwrap(),
572            Some((2, "line2".to_string()))
573        );
574        assert_eq!(
575            reader.next_line().await.unwrap(),
576            Some((3, "line3".to_string()))
577        );
578        assert_eq!(
579            reader.next_line().await.unwrap(),
580            Some((4, "line4".to_string()))
581        );
582    }
583
584    #[tokio::test]
585    async fn test_line_number_tracking() {
586        let input = "line1\nline2\nline3";
587        let mut reader = AsyncLineReader::new(Cursor::new(input));
588
589        assert_eq!(reader.line_number(), 0);
590
591        reader.next_line().await.unwrap();
592        assert_eq!(reader.line_number(), 1);
593
594        reader.next_line().await.unwrap();
595        assert_eq!(reader.line_number(), 2);
596
597        reader.next_line().await.unwrap();
598        assert_eq!(reader.line_number(), 3);
599    }
600
601    #[tokio::test]
602    async fn test_peek_multiple_times() {
603        let input = "line1\nline2";
604        let mut reader = AsyncLineReader::new(Cursor::new(input));
605
606        // Peek multiple times should return the same line
607        assert_eq!(
608            reader.peek_line().await.unwrap(),
609            Some(&(1, "line1".to_string()))
610        );
611        assert_eq!(
612            reader.peek_line().await.unwrap(),
613            Some(&(1, "line1".to_string()))
614        );
615        assert_eq!(
616            reader.peek_line().await.unwrap(),
617            Some(&(1, "line1".to_string()))
618        );
619
620        // Consume it
621        reader.next_line().await.unwrap();
622
623        // Next peek should be the second line
624        assert_eq!(
625            reader.peek_line().await.unwrap(),
626            Some(&(2, "line2".to_string()))
627        );
628    }
629
630    #[tokio::test]
631    async fn test_with_capacity() {
632        let input = "line1\nline2";
633        let mut reader = AsyncLineReader::with_capacity(Cursor::new(input), 1024);
634
635        assert_eq!(
636            reader.next_line().await.unwrap(),
637            Some((1, "line1".to_string()))
638        );
639        assert_eq!(
640            reader.next_line().await.unwrap(),
641            Some((2, "line2".to_string()))
642        );
643    }
644
645    #[tokio::test]
646    async fn test_unicode_content() {
647        let input = "你好\n世界\n🎉";
648        let mut reader = AsyncLineReader::new(Cursor::new(input));
649
650        assert_eq!(
651            reader.next_line().await.unwrap(),
652            Some((1, "你好".to_string()))
653        );
654        assert_eq!(
655            reader.next_line().await.unwrap(),
656            Some((2, "世界".to_string()))
657        );
658        assert_eq!(
659            reader.next_line().await.unwrap(),
660            Some((3, "🎉".to_string()))
661        );
662    }
663
664    #[tokio::test]
665    async fn test_long_line() {
666        let long_line = "a".repeat(10000);
667        let mut reader = AsyncLineReader::new(Cursor::new(long_line.clone()));
668        assert_eq!(reader.next_line().await.unwrap(), Some((1, long_line)));
669    }
670
671    #[tokio::test]
672    async fn test_many_lines() {
673        let lines: Vec<String> = (0..1000).map(|i| format!("line{i}")).collect();
674        let input = lines.join("\n");
675        let mut reader = AsyncLineReader::new(Cursor::new(input));
676
677        for (i, expected) in lines.iter().enumerate() {
678            let result = reader.next_line().await.unwrap();
679            assert_eq!(result, Some((i + 1, expected.clone())));
680        }
681        assert_eq!(reader.next_line().await.unwrap(), None);
682    }
683
684    #[tokio::test]
685    async fn test_push_back_overwrites_peek() {
686        let input = "line1\nline2";
687        let mut reader = AsyncLineReader::new(Cursor::new(input));
688
689        reader.peek_line().await.unwrap(); // Peek line1
690        reader.push_back(42, "pushed".to_string());
691
692        let line = reader.next_line().await.unwrap();
693        assert_eq!(line, Some((42, "pushed".to_string())));
694    }
695}