async_jsonl/
jsonl_reader.rs

1use crate::take_n::{TakeNLines, TakeNLinesReverse};
2use crate::{Jsonl, JsonlReader};
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::fs::File;
7use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader};
8
9#[async_trait::async_trait]
10impl<R: AsyncRead + AsyncSeek + Unpin + Sync + Send> JsonlReader for Jsonl<R> {
11    type NLines = TakeNLines<R>;
12    type NLinesRev = TakeNLinesReverse;
13
14    async fn first_n(self, n: usize) -> anyhow::Result<Self::NLines> {
15        Ok(self.get_n(n))
16    }
17
18    async fn last_n(self, n: usize) -> anyhow::Result<Self::NLinesRev> {
19        self.get_rev_n(n).await
20    }
21
22    async fn count(self) -> usize {
23        StreamExt::count(self).await
24    }
25}
26
27impl<R: AsyncRead + Unpin> Jsonl<R> {
28    pub fn new(file: R) -> Self {
29        let reader = BufReader::new(file);
30        Self {
31            lines: reader.lines(),
32        }
33    }
34
35    /// Get the first n lines from the beginning of the file
36    pub(crate) fn get_n(self, n: usize) -> TakeNLines<R> {
37        let reader = self.lines.into_inner().into_inner();
38        TakeNLines::new(reader, n)
39    }
40}
41
42impl<R: AsyncRead + AsyncSeek + Unpin> Jsonl<R> {
43    /// Get the last n lines from the end of the file (like tail)
44    pub(crate) async fn get_rev_n(self, n: usize) -> anyhow::Result<TakeNLinesReverse> {
45        let reader = self.lines.into_inner().into_inner();
46        TakeNLinesReverse::new(reader, n).await
47    }
48}
49
50impl Jsonl<File> {
51    /// Create a new Jsonl reader from a file path
52    pub async fn from_path<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
53        let file = File::open(path)
54            .await
55            .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
56        Ok(Self::new(file))
57    }
58}
59
60impl<R: AsyncRead + Unpin> Stream for Jsonl<R> {
61    type Item = anyhow::Result<String>;
62
63    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
64        match Pin::new(&mut self.lines).poll_next_line(cx) {
65            Poll::Ready(Ok(Some(line))) => {
66                let line = line.trim();
67                if line.is_empty() {
68                    // Skip empty lines and recursively poll for next
69                    self.poll_next(cx)
70                } else {
71                    Poll::Ready(Some(Ok(line.to_string())))
72                }
73            }
74            Poll::Ready(Ok(None)) => Poll::Ready(None), // EOF
75            Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))),
76            Poll::Pending => Poll::Pending,
77        }
78    }
79}