async_jsonl/
value.rs

1use crate::take_n::{TakeNLines, TakeNLinesReverse};
2use crate::{Jsonl, JsonlDeserialize, JsonlValueDeserialize};
3use futures::{Stream, StreamExt};
4use serde::Deserialize;
5use serde_json::Value;
6use tokio::io::AsyncRead;
7
8impl<R: AsyncRead + Unpin> JsonlDeserialize for Jsonl<R> {
9    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
10    where
11        T: for<'a> Deserialize<'a>,
12    {
13        self.map(|result| {
14            result.and_then(|line| {
15                serde_json::from_str::<T>(&line)
16                    .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
17            })
18        })
19    }
20}
21
22impl<R: AsyncRead + Unpin> JsonlValueDeserialize for Jsonl<R> {
23    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
24        self.deserialize::<Value>()
25    }
26}
27
28// Implementations for TakeNLines
29impl<R: AsyncRead + Unpin> JsonlDeserialize for TakeNLines<R> {
30    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
31    where
32        T: for<'a> Deserialize<'a>,
33    {
34        self.map(|result| {
35            result.and_then(|line| {
36                serde_json::from_str::<T>(&line)
37                    .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
38            })
39        })
40    }
41}
42
43impl<R: AsyncRead + Unpin> JsonlValueDeserialize for TakeNLines<R> {
44    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
45        self.deserialize::<Value>()
46    }
47}
48
49// Implementations for TakeNLinesReverse
50impl JsonlDeserialize for TakeNLinesReverse {
51    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
52    where
53        T: for<'a> Deserialize<'a>,
54    {
55        self.map(|result| {
56            result.and_then(|line| {
57                serde_json::from_str::<T>(&line)
58                    .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
59            })
60        })
61    }
62}
63
64impl JsonlValueDeserialize for TakeNLinesReverse {
65    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
66        self.deserialize::<Value>()
67    }
68}