1use futures::{stream::Stream, task::Context};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::Poll;
5
6mod json_array_stream;
7mod json_depth_analyzer;
8
9use json_array_stream::JsonArrayStream;
10pub use json_array_stream::{stream_json_array, JsonStreamError};
11
12pub struct ParsedStream<T, S, B>
13where
14 S: Stream<Item = B>,
15 B: IntoIterator<Item = u8> + Sized,
16{
17 stream: JsonArrayStream<S, B>,
18 _t: PhantomData<T>,
19}
20
21impl<'de, S, B> JsonArrayStream<S, B>
22where
23 S: Stream<Item = B>,
24 B: IntoIterator<Item = u8> + Sized,
25{
26 pub fn parsed<T>(self) -> ParsedStream<T, S, B>
27 where
28 T: serde::de::Deserialize<'de>,
29 {
30 return ParsedStream {
31 stream: self,
32 _t: PhantomData::<T>,
33 };
34 }
35}
36
37impl<T, S, B> Stream for ParsedStream<T, S, B>
38where
39 S: Stream<Item = B>,
40 B: IntoIterator<Item = u8> + Sized,
41 T: for<'de> serde::de::Deserialize<'de>,
42{
43 type Item = Result<T, JsonStreamError>;
44
45 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
46 let this = unsafe { self.get_unchecked_mut() };
47 match unsafe { Pin::new_unchecked(&mut this.stream) }.poll_next(cx) {
48 Poll::Pending => Poll::Pending,
49 Poll::Ready(opt) => Poll::Ready(opt.map(|res| {
50 res.and_then(|buffer| {
51 serde_json::from_slice(&buffer).map_err(|err| JsonStreamError::from(err))
52 })
53 })),
54 }
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use futures::stream::TryStreamExt;
62
63 #[tokio::test]
64 async fn owned_array() {
65 let json = "[-12,11.1,0]";
66 let stream = futures::stream::once(async { json.bytes() });
67 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
68 .parsed::<f64>()
69 .try_collect()
70 .await;
71
72 assert_eq!(parsed.unwrap(), vec![-12., 11.1, 0.]);
73 }
74}