1use crate::Jsonl;
2use futures::{Stream, StreamExt};
3use serde::Deserialize;
4use serde_json::Value;
5use tokio::io::AsyncRead;
6
7pub trait JsonlDeserialize<R> {
9 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
11 where
12 T: for<'a> Deserialize<'a>;
13}
14
15impl<R: AsyncRead + Unpin> JsonlDeserialize<R> for Jsonl<R> {
16 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
17 where
18 T: for<'a> Deserialize<'a>,
19 {
20 self.map(|result| {
21 result.and_then(|line| {
22 serde_json::from_str::<T>(&line)
23 .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
24 })
25 })
26 }
27}
28
29pub trait JsonlValueDeserialize<R> {
31 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>>;
33}
34
35impl<R: AsyncRead + Unpin> JsonlValueDeserialize<R> for Jsonl<R> {
36 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
37 self.deserialize::<Value>()
38 }
39}
40
41pub fn jsonl_values<R>(reader: R) -> impl Stream<Item = anyhow::Result<Value>>
43where
44 R: AsyncRead + Unpin,
45{
46 Jsonl::new(reader).deserialize_values()
47}