1use crate::take_n::{TakeNLines, TakeNLinesReverse};
2use crate::{Jsonl, JsonlDeserialize, JsonlValueDeserialize};
3use futures::{Stream, StreamExt};
4use serde::Deserialize;
5use serde_json::Value;
6use tokio::io::AsyncRead;
7
8impl<R: AsyncRead + Unpin> JsonlDeserialize for Jsonl<R> {
9 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
10 where
11 T: for<'a> Deserialize<'a>,
12 {
13 self.map(|result| {
14 result.and_then(|line| {
15 serde_json::from_str::<T>(&line)
16 .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
17 })
18 })
19 }
20}
21
22impl<R: AsyncRead + Unpin> JsonlValueDeserialize for Jsonl<R> {
23 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
24 self.deserialize::<Value>()
25 }
26}
27
28impl<R: AsyncRead + Unpin> JsonlDeserialize for TakeNLines<R> {
30 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
31 where
32 T: for<'a> Deserialize<'a>,
33 {
34 self.map(|result| {
35 result.and_then(|line| {
36 serde_json::from_str::<T>(&line)
37 .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
38 })
39 })
40 }
41}
42
43impl<R: AsyncRead + Unpin> JsonlValueDeserialize for TakeNLines<R> {
44 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
45 self.deserialize::<Value>()
46 }
47}
48
49impl JsonlDeserialize for TakeNLinesReverse {
51 fn deserialize<T>(self) -> impl Stream<Item = anyhow::Result<T>>
52 where
53 T: for<'a> Deserialize<'a>,
54 {
55 self.map(|result| {
56 result.and_then(|line| {
57 serde_json::from_str::<T>(&line)
58 .map_err(|e| anyhow::anyhow!("Failed to parse JSON line: {}", e))
59 })
60 })
61 }
62}
63
64impl JsonlValueDeserialize for TakeNLinesReverse {
65 fn deserialize_values(self) -> impl Stream<Item = anyhow::Result<Value>> {
66 self.deserialize::<Value>()
67 }
68}