async_jsonl/
value.rs

1use crate::Jsonl;
2use futures::{Stream, StreamExt};
3use serde::Deserialize;
4use serde_json::Value;
5use tokio::io::AsyncRead;
6
7/// Extension trait to add deserialization capabilities to JsonlIterator
8pub trait JsonlDeserialize<R> {
9    /// Deserialize JSON lines into the specified type
10    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
11    where
12        T: for<'a> Deserialize<'a>;
13}
14
15impl<R: AsyncRead + Unpin> JsonlDeserialize<R> for Jsonl<R> {
16    fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
17    where
18        T: for<'a> Deserialize<'a>,
19    {
20        self.map(|result| {
21            result.and_then(|line| {
22                serde_json::from_str::<T>(&line)
23                    .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
24            })
25        })
26    }
27}
28
29/// Extension trait specifically for deserializing JSONL to serde_json::Value
30pub trait JsonlValueDeserialize<R> {
31    /// Deserialize JSON lines into serde_json::Value objects
32    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>>;
33}
34
35impl<R: AsyncRead + Unpin> JsonlValueDeserialize<R> for Jsonl<R> {
36    fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
37        self.deserialize::<Value>()
38    }
39}
40
41/// Convenience function to create a JsonlIterator that specifically works with serde_json::Value
42pub fn jsonl_values<R>(reader: R) -> impl Stream<Item = anyhow::Result<Value>>
43where
44    R: AsyncRead + Unpin,
45{
46    Jsonl::new(reader).deserialize_values()
47}