async_jsonl/
async_jsonl.rs1use futures::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::fs::File;
5use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader, Lines};
6
7pub struct Jsonl<R> {
9 pub(crate) lines: Lines<BufReader<R>>,
10}
11
12impl<R: AsyncRead + Unpin> Jsonl<R> {
13 pub fn new(file: R) -> Self {
14 let reader = BufReader::new(file);
15 Self {
16 lines: reader.lines(),
17 }
18 }
19 pub async fn count_lines(mut self) -> anyhow::Result<usize> {
21 let mut count = 0;
22 while let Some(line) = self.lines.next_line().await? {
23 let trimmed = line.trim();
24 if !trimmed.is_empty() {
25 count += 1;
26 }
27 }
28 Ok(count)
29 }
30}
31
32impl Jsonl<File> {
33 pub async fn from_path<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
35 let file = File::open(path)
36 .await
37 .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
38 Ok(Self::new(file))
39 }
40}
41
42impl<R: AsyncRead + Unpin> Stream for Jsonl<R> {
43 type Item = anyhow::Result<String>;
44
45 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
46 match Pin::new(&mut self.lines).poll_next_line(cx) {
47 Poll::Ready(Ok(Some(line))) => {
48 let line = line.trim();
49 if line.is_empty() {
50 self.poll_next(cx)
52 } else {
53 Poll::Ready(Some(Ok(line.to_string())))
54 }
55 }
56 Poll::Ready(Ok(None)) => Poll::Ready(None), Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))),
58 Poll::Pending => Poll::Pending,
59 }
60 }
61}