json_array_stream/
lib.rs

1use futures::{stream::Stream, task::Context};
2use std::marker::PhantomData;
3use std::pin::Pin;
4use std::task::Poll;
5
6mod json_array_stream;
7mod json_depth_analyzer;
8
9use json_array_stream::JsonArrayStream;
10pub use json_array_stream::{stream_json_array, JsonStreamError};
11
12pub struct ParsedStream<T, S, B>
13where
14    S: Stream<Item = B>,
15    B: IntoIterator<Item = u8> + Sized,
16{
17    stream: JsonArrayStream<S, B>,
18    _t: PhantomData<T>,
19}
20
21impl<'de, S, B> JsonArrayStream<S, B>
22where
23    S: Stream<Item = B>,
24    B: IntoIterator<Item = u8> + Sized,
25{
26    pub fn parsed<T>(self) -> ParsedStream<T, S, B>
27    where
28        T: serde::de::Deserialize<'de>,
29    {
30        return ParsedStream {
31            stream: self,
32            _t: PhantomData::<T>,
33        };
34    }
35}
36
37impl<T, S, B> Stream for ParsedStream<T, S, B>
38where
39    S: Stream<Item = B>,
40    B: IntoIterator<Item = u8> + Sized,
41    T: for<'de> serde::de::Deserialize<'de>,
42{
43    type Item = Result<T, JsonStreamError>;
44
45    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
46        let this = unsafe { self.get_unchecked_mut() };
47        match unsafe { Pin::new_unchecked(&mut this.stream) }.poll_next(cx) {
48            Poll::Pending => Poll::Pending,
49            Poll::Ready(opt) => Poll::Ready(opt.map(|res| {
50                res.and_then(|buffer| {
51                    serde_json::from_slice(&buffer).map_err(|err| JsonStreamError::from(err))
52                })
53            })),
54        }
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use futures::stream::TryStreamExt;
62
63    #[tokio::test]
64    async fn owned_array() {
65        let json = "[-12,11.1,0]";
66        let stream = futures::stream::once(async { json.bytes() });
67        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
68            .parsed::<f64>()
69            .try_collect()
70            .await;
71
72        assert_eq!(parsed.unwrap(), vec![-12., 11.1, 0.]);
73    }
74}