async_jsonl/
jsonl_reader.rs1use crate::take_n::{TakeNLines, TakeNLinesReverse};
2use crate::{Jsonl, JsonlReader};
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::fs::File;
7use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncSeek, BufReader};
8
9#[async_trait::async_trait]
10impl<R: AsyncRead + AsyncSeek + Unpin + Sync + Send> JsonlReader for Jsonl<R> {
11 type NLines = TakeNLines<R>;
12 type NLinesRev = TakeNLinesReverse;
13
14 async fn first_n(self, n: usize) -> anyhow::Result<Self::NLines> {
15 Ok(self.get_n(n))
16 }
17
18 async fn last_n(self, n: usize) -> anyhow::Result<Self::NLinesRev> {
19 self.get_rev_n(n).await
20 }
21
22 async fn count(self) -> usize {
23 StreamExt::count(self).await
24 }
25}
26
27impl<R: AsyncRead + Unpin> Jsonl<R> {
28 pub fn new(file: R) -> Self {
29 let reader = BufReader::new(file);
30 Self {
31 lines: reader.lines(),
32 }
33 }
34
35 pub(crate) fn get_n(self, n: usize) -> TakeNLines<R> {
37 let reader = self.lines.into_inner().into_inner();
38 TakeNLines::new(reader, n)
39 }
40}
41
42impl<R: AsyncRead + AsyncSeek + Unpin> Jsonl<R> {
43 pub(crate) async fn get_rev_n(self, n: usize) -> anyhow::Result<TakeNLinesReverse> {
45 let reader = self.lines.into_inner().into_inner();
46 TakeNLinesReverse::new(reader, n).await
47 }
48}
49
50impl Jsonl<File> {
51 pub async fn from_path<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
53 let file = File::open(path)
54 .await
55 .map_err(|e| anyhow::anyhow!("Failed to open file: {}", e))?;
56 Ok(Self::new(file))
57 }
58}
59
60impl<R: AsyncRead + Unpin> Stream for Jsonl<R> {
61 type Item = anyhow::Result<String>;
62
63 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
64 match Pin::new(&mut self.lines).poll_next_line(cx) {
65 Poll::Ready(Ok(Some(line))) => {
66 let line = line.trim();
67 if line.is_empty() {
68 self.poll_next(cx)
70 } else {
71 Poll::Ready(Some(Ok(line.to_string())))
72 }
73 }
74 Poll::Ready(Ok(None)) => Poll::Ready(None), Poll::Ready(Err(e)) => Poll::Ready(Some(Err(anyhow::anyhow!("IO error: {}", e)))),
76 Poll::Pending => Poll::Pending,
77 }
78 }
79}