Skip to main content

hedl_stream/
async_reader.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Async line reader for streaming parser.
19//!
20//! Provides buffered async line-by-line reading with line number tracking, peek support,
21//! and the ability to push back lines for re-parsing.
22//!
23//! This module mirrors the synchronous [`LineReader`](crate::LineReader) but uses
24//! tokio's async I/O primitives for non-blocking operation.
25//!
26//! # Examples
27//!
28//! ## Basic Async Line Reading
29//!
30//! ```rust,no_run
31//! # #[cfg(feature = "async")]
32//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
33//! use hedl_stream::AsyncLineReader;
34//! use tokio::io::AsyncReadExt;
35//! use std::io::Cursor;
36//!
37//! let input = "line1\nline2\nline3";
38//! let mut reader = AsyncLineReader::new(Cursor::new(input));
39//!
40//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
41//! assert_eq!(reader.next_line().await?, Some((2, "line2".to_string())));
42//! assert_eq!(reader.next_line().await?, Some((3, "line3".to_string())));
43//! assert_eq!(reader.next_line().await?, None);
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! ## Peeking and Push Back
49//!
50//! ```rust,no_run
51//! # #[cfg(feature = "async")]
52//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
53//! use hedl_stream::AsyncLineReader;
54//! use std::io::Cursor;
55//!
56//! let input = "line1\nline2";
57//! let mut reader = AsyncLineReader::new(Cursor::new(input));
58//!
59//! // Peek without consuming
60//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
61//! assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
62//!
63//! // Now consume it
64//! let line = reader.next_line().await?.unwrap();
65//! assert_eq!(line, (1, "line1".to_string()));
66//!
67//! // Push it back
68//! reader.push_back(line.0, line.1);
69//!
70//! // Read it again
71//! assert_eq!(reader.next_line().await?, Some((1, "line1".to_string())));
72//! # Ok(())
73//! # }
74//! ```
75
76use crate::error::{StreamError, StreamResult};
77use memchr::memchr;
78use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
79
80/// Buffered async line reader with line number tracking.
81///
82/// Reads input line-by-line asynchronously, automatically handling different line endings
83/// (LF, CRLF) and tracking the current line number for error reporting.
84///
85/// # Performance Characteristics
86///
87/// - **Buffering**: Configurable buffer size (default 64KB) for efficient I/O
88/// - **Zero-Copy**: String allocations only for consumed lines
89/// - **Async**: Non-blocking I/O suitable for high-concurrency scenarios
90///
91/// # When to Use Async vs Sync
92///
93/// **Use Async When:**
94/// - Processing network streams or pipes
95/// - High-concurrency scenarios (many parallel streams)
96/// - Integration with async web servers or frameworks
97/// - Need to process I/O without blocking threads
98///
99/// **Use Sync When:**
100/// - Processing local files
101/// - Single-threaded batch processing
102/// - Simpler code without async complexity
103/// - CPU-bound workloads with minimal I/O wait
104///
105/// # Examples
106///
107/// ## Reading from Async Source
108///
109/// ```rust,no_run
110/// # #[cfg(feature = "async")]
111/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
112/// use hedl_stream::AsyncLineReader;
113/// use tokio::fs::File;
114///
115/// let file = File::open("data.hedl").await?;
116/// let mut reader = AsyncLineReader::new(file);
117///
118/// while let Some((line_num, line)) = reader.next_line().await? {
119///     println!("{}: {}", line_num, line);
120/// }
121/// # Ok(())
122/// # }
123/// ```
124///
125/// ## With Custom Buffer Size
126///
127/// ```rust
128/// # #[cfg(feature = "async")]
129/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
130/// use hedl_stream::AsyncLineReader;
131/// use std::io::Cursor;
132///
133/// let input = "line1\nline2";
134/// let reader = AsyncLineReader::with_capacity(Cursor::new(input), 256 * 1024);
135/// # Ok(())
136/// # }
137/// ```
138pub struct AsyncLineReader<R: AsyncRead + Unpin> {
139    reader: BufReader<R>,
140    line_number: usize,
141    buffer: String,
142    peeked: Option<(usize, String)>,
143    max_line_length: usize,
144}
145
146impl<R: AsyncRead + Unpin> AsyncLineReader<R> {
147    /// Create a new async line reader with default buffer size (64KB) and max line length (1MB).
148    ///
149    /// # Examples
150    ///
151    /// ```rust
152    /// # #[cfg(feature = "async")]
153    /// use hedl_stream::AsyncLineReader;
154    /// use std::io::Cursor;
155    ///
156    /// let input = "line1\nline2";
157    /// let reader = AsyncLineReader::new(Cursor::new(input));
158    /// ```
159    pub fn new(reader: R) -> Self {
160        Self {
161            reader: BufReader::new(reader),
162            line_number: 0,
163            buffer: String::new(),
164            peeked: None,
165            max_line_length: 1_000_000,
166        }
167    }
168
169    /// Create an async line reader with a specific buffer capacity and default max line length (1MB).
170    ///
171    /// # Parameters
172    ///
173    /// - `reader`: The async readable source
174    /// - `capacity`: Buffer size in bytes
175    ///
176    /// # Examples
177    ///
178    /// ```rust
179    /// # #[cfg(feature = "async")]
180    /// use hedl_stream::AsyncLineReader;
181    /// use std::io::Cursor;
182    ///
183    /// // Use a larger buffer for large files
184    /// let reader = AsyncLineReader::with_capacity(
185    ///     Cursor::new("data"),
186    ///     256 * 1024  // 256KB
187    /// );
188    /// ```
189    pub fn with_capacity(reader: R, capacity: usize) -> Self {
190        Self {
191            reader: BufReader::with_capacity(capacity, reader),
192            line_number: 0,
193            buffer: String::new(),
194            peeked: None,
195            max_line_length: 1_000_000,
196        }
197    }
198
199    /// Create with a specific max line length.
200    pub fn with_max_length(reader: R, max_line_length: usize) -> Self {
201        Self {
202            reader: BufReader::new(reader),
203            line_number: 0,
204            buffer: String::new(),
205            peeked: None,
206            max_line_length,
207        }
208    }
209
210    /// Create with a specific buffer capacity and max line length.
211    pub fn with_capacity_and_max_length(
212        reader: R,
213        capacity: usize,
214        max_line_length: usize,
215    ) -> Self {
216        Self {
217            reader: BufReader::with_capacity(capacity, reader),
218            line_number: 0,
219            buffer: String::new(),
220            peeked: None,
221            max_line_length,
222        }
223    }
224
225    /// Get the current line number.
226    ///
227    /// Returns 0 before any lines are read, then increments with each line.
228    ///
229    /// # Examples
230    ///
231    /// ```rust
232    /// # #[cfg(feature = "async")]
233    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
234    /// use hedl_stream::AsyncLineReader;
235    /// use std::io::Cursor;
236    ///
237    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
238    ///
239    /// assert_eq!(reader.line_number(), 0);
240    /// reader.next_line().await?;
241    /// assert_eq!(reader.line_number(), 1);
242    /// reader.next_line().await?;
243    /// assert_eq!(reader.line_number(), 2);
244    /// # Ok(())
245    /// # }
246    /// ```
247    #[inline]
248    pub fn line_number(&self) -> usize {
249        self.line_number
250    }
251
252    /// Read the next line asynchronously.
253    ///
254    /// Returns `Ok(Some((line_num, line)))` if a line was read, `Ok(None)` at EOF,
255    /// or `Err` on I/O errors.
256    ///
257    /// Trailing newlines (LF or CRLF) are automatically stripped.
258    ///
259    /// # Performance
260    ///
261    /// This method awaits on I/O and yields to the runtime if data is not available,
262    /// allowing other tasks to run. It does not block the thread.
263    ///
264    /// # Examples
265    ///
266    /// ```rust
267    /// # #[cfg(feature = "async")]
268    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
269    /// use hedl_stream::AsyncLineReader;
270    /// use std::io::Cursor;
271    ///
272    /// let mut reader = AsyncLineReader::new(Cursor::new("hello\nworld"));
273    ///
274    /// let (num, line) = reader.next_line().await?.unwrap();
275    /// assert_eq!(num, 1);
276    /// assert_eq!(line, "hello");
277    ///
278    /// let (num, line) = reader.next_line().await?.unwrap();
279    /// assert_eq!(num, 2);
280    /// assert_eq!(line, "world");
281    ///
282    /// assert_eq!(reader.next_line().await?, None);
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn next_line(&mut self) -> StreamResult<Option<(usize, String)>> {
287        // Return peeked line if available
288        if let Some(peeked) = self.peeked.take() {
289            return Ok(Some(peeked));
290        }
291
292        self.read_line_internal().await
293    }
294
295    /// Peek at the next line without consuming it.
296    ///
297    /// Returns a reference to the next line without advancing the reader.
298    /// Subsequent calls to `peek_line()` return the same line. Call `next_line()`
299    /// to consume it.
300    ///
301    /// # Examples
302    ///
303    /// ```rust
304    /// # #[cfg(feature = "async")]
305    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
306    /// use hedl_stream::AsyncLineReader;
307    /// use std::io::Cursor;
308    ///
309    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
310    ///
311    /// // Peek multiple times
312    /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
313    /// assert_eq!(reader.peek_line().await?, Some(&(1, "line1".to_string())));
314    ///
315    /// // Consume
316    /// reader.next_line().await?;
317    ///
318    /// // Next peek is the second line
319    /// assert_eq!(reader.peek_line().await?, Some(&(2, "line2".to_string())));
320    /// # Ok(())
321    /// # }
322    /// ```
323    pub async fn peek_line(&mut self) -> StreamResult<Option<&(usize, String)>> {
324        if self.peeked.is_none() {
325            self.peeked = self.read_line_internal().await?;
326        }
327        Ok(self.peeked.as_ref())
328    }
329
330    /// Push a line back to be read again.
331    ///
332    /// The next call to `next_line()` or `peek_line()` will return this line.
333    /// Only one line can be pushed back at a time; subsequent calls overwrite
334    /// the previously pushed line.
335    ///
336    /// # Examples
337    ///
338    /// ```rust
339    /// # #[cfg(feature = "async")]
340    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
341    /// use hedl_stream::AsyncLineReader;
342    /// use std::io::Cursor;
343    ///
344    /// let mut reader = AsyncLineReader::new(Cursor::new("line1\nline2"));
345    ///
346    /// let line = reader.next_line().await?.unwrap();
347    /// assert_eq!(line, (1, "line1".to_string()));
348    ///
349    /// // Push it back
350    /// reader.push_back(line.0, line.1);
351    ///
352    /// // Read it again
353    /// let line = reader.next_line().await?.unwrap();
354    /// assert_eq!(line, (1, "line1".to_string()));
355    /// # Ok(())
356    /// # }
357    /// ```
358    #[inline]
359    pub fn push_back(&mut self, line_num: usize, line: String) {
360        self.peeked = Some((line_num, line));
361    }
362
363    async fn read_line_internal(&mut self) -> StreamResult<Option<(usize, String)>> {
364        self.buffer.clear();
365
366        loop {
367            // Read from BufReader's internal buffer (zero-copy)
368            let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
369
370            if available.is_empty() {
371                // EOF
372                if self.buffer.is_empty() {
373                    return Ok(None);
374                }
375                // Return partial line (no trailing newline)
376                self.line_number += 1;
377                return Ok(Some((self.line_number, self.buffer.clone())));
378            }
379
380            // Find newline in available data
381            if let Some(newline_pos) = memchr(b'\n', available) {
382                // Check limit BEFORE appending
383                if self.buffer.len() + newline_pos > self.max_line_length {
384                    // CRITICAL: Consume the oversized line data to prevent infinite loop
385                    // Consume up to and including the newline character
386                    self.reader.consume(newline_pos + 1);
387                    let total_length = self.buffer.len() + newline_pos;
388                    self.line_number += 1;
389                    self.buffer.clear();
390                    return Err(StreamError::LineTooLong {
391                        line: self.line_number,
392                        length: total_length,
393                        limit: self.max_line_length,
394                    });
395                }
396
397                // Append up to newline (excluding the newline itself)
398                let mut line_end = newline_pos;
399
400                // Handle CRLF: if newline is preceded by CR, exclude it too
401                if newline_pos > 0 && available[newline_pos - 1] == b'\r' {
402                    line_end = newline_pos - 1;
403                }
404
405                let to_append = &available[..line_end];
406
407                // Validate UTF-8 before appending
408                let line_str =
409                    std::str::from_utf8(to_append).map_err(|e| StreamError::InvalidUtf8 {
410                        line: self.line_number + 1,
411                        error: e,
412                    })?;
413
414                self.buffer.push_str(line_str);
415
416                // Consume bytes including newline
417                self.reader.consume(newline_pos + 1);
418
419                self.line_number += 1;
420                return Ok(Some((self.line_number, self.buffer.clone())));
421            } else {
422                // No newline yet, check if adding entire buffer exceeds limit
423                if self.buffer.len() + available.len() > self.max_line_length {
424                    // CRITICAL: Consume all available data and skip to end of line
425                    // to prevent infinite loop on subsequent reads
426                    let accumulated = self.buffer.len() + available.len();
427                    let consumed = available.len();
428                    self.reader.consume(consumed);
429
430                    // Continue reading and discarding until we find the end of line
431                    self.skip_to_end_of_line().await?;
432
433                    self.line_number += 1;
434                    self.buffer.clear();
435                    return Err(StreamError::LineTooLong {
436                        line: self.line_number,
437                        length: accumulated,
438                        limit: self.max_line_length,
439                    });
440                }
441
442                // Validate UTF-8 before appending
443                let chunk_str =
444                    std::str::from_utf8(available).map_err(|e| StreamError::InvalidUtf8 {
445                        line: self.line_number + 1,
446                        error: e,
447                    })?;
448
449                // Append entire buffer and continue reading
450                self.buffer.push_str(chunk_str);
451
452                let len = available.len();
453                self.reader.consume(len);
454            }
455        }
456    }
457
458    /// Skip to end of line when handling oversized line errors.
459    /// Consumes data until a newline is found or EOF is reached.
460    async fn skip_to_end_of_line(&mut self) -> StreamResult<()> {
461        loop {
462            let available = self.reader.fill_buf().await.map_err(StreamError::Io)?;
463
464            if available.is_empty() {
465                // EOF reached, line is done
466                return Ok(());
467            }
468
469            if let Some(newline_pos) = memchr(b'\n', available) {
470                // Found newline, consume up to and including it
471                self.reader.consume(newline_pos + 1);
472                return Ok(());
473            } else {
474                // No newline, consume all and continue
475                let len = available.len();
476                self.reader.consume(len);
477            }
478        }
479    }
480}
481
482#[cfg(all(test, feature = "async"))]
483mod tests {
484    use super::*;
485    use std::io::Cursor;
486
487    #[tokio::test]
488    async fn test_read_lines() {
489        let input = "line1\nline2\nline3";
490        let mut reader = AsyncLineReader::new(Cursor::new(input));
491
492        assert_eq!(
493            reader.next_line().await.unwrap(),
494            Some((1, "line1".to_string()))
495        );
496        assert_eq!(
497            reader.next_line().await.unwrap(),
498            Some((2, "line2".to_string()))
499        );
500        assert_eq!(
501            reader.next_line().await.unwrap(),
502            Some((3, "line3".to_string()))
503        );
504        assert_eq!(reader.next_line().await.unwrap(), None);
505    }
506
507    #[tokio::test]
508    async fn test_peek_and_push_back() {
509        let input = "line1\nline2";
510        let mut reader = AsyncLineReader::new(Cursor::new(input));
511
512        let peeked = reader.peek_line().await.unwrap().cloned();
513        assert_eq!(peeked, Some((1, "line1".to_string())));
514
515        // Should still return the same line
516        let line = reader.next_line().await.unwrap();
517        assert_eq!(line, Some((1, "line1".to_string())));
518
519        // Push back
520        reader.push_back(1, "line1".to_string());
521        let line = reader.next_line().await.unwrap();
522        assert_eq!(line, Some((1, "line1".to_string())));
523    }
524
525    #[tokio::test]
526    async fn test_empty_input() {
527        let input = "";
528        let mut reader = AsyncLineReader::new(Cursor::new(input));
529        assert_eq!(reader.next_line().await.unwrap(), None);
530    }
531
532    #[tokio::test]
533    async fn test_single_empty_line() {
534        let input = "\n";
535        let mut reader = AsyncLineReader::new(Cursor::new(input));
536        assert_eq!(reader.next_line().await.unwrap(), Some((1, String::new())));
537        assert_eq!(reader.next_line().await.unwrap(), None);
538    }
539
540    #[tokio::test]
541    async fn test_crlf_line_endings() {
542        let input = "line1\r\nline2\r\nline3";
543        let mut reader = AsyncLineReader::new(Cursor::new(input));
544        assert_eq!(
545            reader.next_line().await.unwrap(),
546            Some((1, "line1".to_string()))
547        );
548        assert_eq!(
549            reader.next_line().await.unwrap(),
550            Some((2, "line2".to_string()))
551        );
552        assert_eq!(
553            reader.next_line().await.unwrap(),
554            Some((3, "line3".to_string()))
555        );
556    }
557
558    #[tokio::test]
559    async fn test_mixed_line_endings() {
560        let input = "line1\nline2\r\nline3\nline4";
561        let mut reader = AsyncLineReader::new(Cursor::new(input));
562        assert_eq!(
563            reader.next_line().await.unwrap(),
564            Some((1, "line1".to_string()))
565        );
566        assert_eq!(
567            reader.next_line().await.unwrap(),
568            Some((2, "line2".to_string()))
569        );
570        assert_eq!(
571            reader.next_line().await.unwrap(),
572            Some((3, "line3".to_string()))
573        );
574        assert_eq!(
575            reader.next_line().await.unwrap(),
576            Some((4, "line4".to_string()))
577        );
578    }
579
580    #[tokio::test]
581    async fn test_line_number_tracking() {
582        let input = "line1\nline2\nline3";
583        let mut reader = AsyncLineReader::new(Cursor::new(input));
584
585        assert_eq!(reader.line_number(), 0);
586
587        reader.next_line().await.unwrap();
588        assert_eq!(reader.line_number(), 1);
589
590        reader.next_line().await.unwrap();
591        assert_eq!(reader.line_number(), 2);
592
593        reader.next_line().await.unwrap();
594        assert_eq!(reader.line_number(), 3);
595    }
596
597    #[tokio::test]
598    async fn test_peek_multiple_times() {
599        let input = "line1\nline2";
600        let mut reader = AsyncLineReader::new(Cursor::new(input));
601
602        // Peek multiple times should return the same line
603        assert_eq!(
604            reader.peek_line().await.unwrap(),
605            Some(&(1, "line1".to_string()))
606        );
607        assert_eq!(
608            reader.peek_line().await.unwrap(),
609            Some(&(1, "line1".to_string()))
610        );
611        assert_eq!(
612            reader.peek_line().await.unwrap(),
613            Some(&(1, "line1".to_string()))
614        );
615
616        // Consume it
617        reader.next_line().await.unwrap();
618
619        // Next peek should be the second line
620        assert_eq!(
621            reader.peek_line().await.unwrap(),
622            Some(&(2, "line2".to_string()))
623        );
624    }
625
626    #[tokio::test]
627    async fn test_with_capacity() {
628        let input = "line1\nline2";
629        let mut reader = AsyncLineReader::with_capacity(Cursor::new(input), 1024);
630
631        assert_eq!(
632            reader.next_line().await.unwrap(),
633            Some((1, "line1".to_string()))
634        );
635        assert_eq!(
636            reader.next_line().await.unwrap(),
637            Some((2, "line2".to_string()))
638        );
639    }
640
641    #[tokio::test]
642    async fn test_unicode_content() {
643        let input = "你好\n世界\n🎉";
644        let mut reader = AsyncLineReader::new(Cursor::new(input));
645
646        assert_eq!(
647            reader.next_line().await.unwrap(),
648            Some((1, "你好".to_string()))
649        );
650        assert_eq!(
651            reader.next_line().await.unwrap(),
652            Some((2, "世界".to_string()))
653        );
654        assert_eq!(
655            reader.next_line().await.unwrap(),
656            Some((3, "🎉".to_string()))
657        );
658    }
659
660    #[tokio::test]
661    async fn test_long_line() {
662        let long_line = "a".repeat(10000);
663        let mut reader = AsyncLineReader::new(Cursor::new(long_line.clone()));
664        assert_eq!(reader.next_line().await.unwrap(), Some((1, long_line)));
665    }
666
667    #[tokio::test]
668    async fn test_many_lines() {
669        let lines: Vec<String> = (0..1000).map(|i| format!("line{i}")).collect();
670        let input = lines.join("\n");
671        let mut reader = AsyncLineReader::new(Cursor::new(input));
672
673        for (i, expected) in lines.iter().enumerate() {
674            let result = reader.next_line().await.unwrap();
675            assert_eq!(result, Some((i + 1, expected.clone())));
676        }
677        assert_eq!(reader.next_line().await.unwrap(), None);
678    }
679
680    #[tokio::test]
681    async fn test_push_back_overwrites_peek() {
682        let input = "line1\nline2";
683        let mut reader = AsyncLineReader::new(Cursor::new(input));
684
685        reader.peek_line().await.unwrap(); // Peek line1
686        reader.push_back(42, "pushed".to_string());
687
688        let line = reader.next_line().await.unwrap();
689        assert_eq!(line, Some((42, "pushed".to_string())));
690    }
691}