async_jsonl/
async_jsonl.rs

1use futures::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::fs::File;
5use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader, Lines};
6
7/// Iterator to read JSONL file as raw JSON strings
8pub struct Jsonl<R> {
9    pub(crate) lines: Lines<BufReader<R>>,
10}
11
12impl<R: AsyncRead + Unpin> Jsonl<R> {
13    pub fn new(file: R) -> Self {
14        let reader = BufReader::new(file);
15        Self {
16            lines: reader.lines(),
17        }
18    }
19    /// Count lines from any AsyncRead source
20    pub async fn count_lines(mut self) -> anyhow::Result<usize> {
21        let mut count = 0;
22        while let Some(line) = self.lines.next_line().await? {
23            let trimmed = line.trim();
24            if !trimmed.is_empty() {
25                count += 1;
26            }
27        }
28        Ok(count)
29    }
30}
31
32impl Jsonl<File> {
33    /// Create a new Jsonl reader from a file path
34    pub async fn from_path<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
35        let file = File::open(path)
36            .await
37            .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
38        Ok(Self::new(file))
39    }
40}
41
42impl<R: AsyncRead + Unpin> Stream for Jsonl<R> {
43    type Item = anyhow::Result<String>;
44
45    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46        match Pin::new(&mut self.lines).poll_next_line(cx) {
47            Poll::Ready(Ok(Some(line))) => {
48                let line = line.trim();
49                if line.is_empty() {
50                    // Skip empty lines and recursively poll for next
51                    self.poll_next(cx)
52                } else {
53                    Poll::Ready(Some(Ok(line.to_string())))
54                }
55            }
56            Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF
57            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))),
58            Poll::Pending => Poll::Pending,
59        }
60    }
61}