json_array_stream/
json_array_stream.rs

1use futures::{stream::Stream, task::Context};
2use std::pin::Pin;
3use std::task::Poll;
4use thiserror::Error;
5
6use super::json_depth_analyzer;
7
8#[derive(Error, Debug)]
9pub enum JsonStreamError {
10    #[error(transparent)]
11    Io(#[from] std::io::Error),
12    #[error("invalid syntax")]
13    Parser(#[from] json_depth_analyzer::ParserError),
14    #[error("invalid json")]
15    Json(#[from] serde_json::error::Error),
16}
17
18pub struct JsonArrayStream<S, B>
19where
20    S: Stream<Item = B>,
21    B: IntoIterator<Item = u8> + Sized,
22{
23    analyzer: json_depth_analyzer::JsonDepthAnalyzer,
24    buffer: Vec<u8>,
25    stream: Pin<Box<S>>,
26    chunk: Option<B::IntoIter>,
27    comma: bool,
28    end: bool,
29}
30
31impl<S, B> Stream for JsonArrayStream<S, B>
32where
33    S: Stream<Item = B>,
34    B: IntoIterator<Item = u8> + Sized,
35{
36    type Item = Result<Vec<u8>, JsonStreamError>;
37
38    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
39        let this = unsafe { self.get_unchecked_mut() };
40        if this.end {
41            return Poll::Ready(None);
42        }
43
44        loop {
45            if let Some(chunk) = this.chunk.as_mut() {
46                for c in chunk {
47                    let initial_depth = this.analyzer.depth();
48
49                    this.analyzer
50                        .process(c)
51                        .map_err(|err| JsonStreamError::from(err))?;
52
53                    if initial_depth == 0 {
54                        continue;
55                    }
56
57                    let emit = if initial_depth == 1 && c == b',' {
58                        this.comma = true;
59                        true
60                    } else if initial_depth == 1 && (c as char).is_whitespace() {
61                        false
62                    } else if this.analyzer.depth() == 0 {
63                        this.end = true;
64                        true
65                    } else {
66                        this.buffer.push(c);
67                        false
68                    };
69
70                    if emit {
71                        if this.buffer.len() == 0 && !this.comma {
72                            return Poll::Ready(None);
73                        }
74
75                        let mut empty = vec![];
76                        std::mem::swap(&mut empty, &mut this.buffer);
77                        return Poll::Ready(Some(Ok(empty)));
78                    }
79                }
80                this.chunk = None;
81            }
82
83            match this.stream.as_mut().poll_next(cx) {
84                Poll::Ready(None) => {
85                    return Poll::Ready(Some(Err(JsonStreamError::from(std::io::Error::new(
86                        std::io::ErrorKind::UnexpectedEof,
87                        "preliminary EOF when parsing json array",
88                    )))));
89                }
90                Poll::Pending => {
91                    return Poll::Pending;
92                }
93                Poll::Ready(Some(chunk)) => {
94                    this.chunk = Some(chunk.into_iter());
95                }
96            }
97        }
98    }
99}
100
101pub fn stream_json_array<S, B>(stream: S) -> JsonArrayStream<S, B>
102where
103    S: Stream<Item = B>,
104    B: IntoIterator<Item = u8> + Sized,
105{
106    JsonArrayStream {
107        stream: Box::pin(stream),
108        analyzer: json_depth_analyzer::JsonDepthAnalyzer::new(),
109        buffer: vec![],
110        chunk: None,
111        comma: false,
112        end: false,
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::*;
119    use futures::prelude::*;
120    use std::error::Error;
121
122    #[tokio::test]
123    async fn empty_array() {
124        let json = "[]";
125        let stream = futures::stream::once(async { json.bytes() });
126        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
127            .map_err(|err| Box::new(err) as Box<dyn Error>)
128            .and_then(move |buffer| {
129                future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
130            })
131            .try_collect()
132            .await;
133
134        assert_eq!(parsed.unwrap(), vec![] as Vec<&str>);
135    }
136
137    #[tokio::test]
138    async fn single_value() {
139        let json = "[12]";
140        let stream = futures::stream::once(async { json.bytes() });
141        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
142            .map_err(|err| Box::new(err) as Box<dyn Error>)
143            .and_then(|buffer| {
144                future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
145            })
146            .try_collect()
147            .await;
148
149        assert_eq!(parsed.unwrap(), vec!["12"]);
150    }
151
152    #[tokio::test]
153    async fn multiple_values() {
154        let json = "[\"blubb\", 42,{\"xxx\":false , \"yyy\":\"abc\"} ] ";
155        let stream = futures::stream::once(async { json.bytes() });
156        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
157            .map_err(|err| Box::new(err) as Box<dyn Error>)
158            .and_then(|buffer| {
159                future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
160            })
161            .try_collect()
162            .await;
163
164        assert_eq!(
165            parsed.unwrap(),
166            vec!["\"blubb\"", "42", "{\"xxx\":false , \"yyy\":\"abc\"}"]
167        );
168    }
169
170    #[tokio::test]
171    async fn comma_without_values() {
172        let json = "[,]";
173        let stream = futures::stream::once(async { json.bytes() });
174        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
175            .map_err(|err| Box::new(err) as Box<dyn Error>)
176            .and_then(|buffer| {
177                future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
178            })
179            .try_collect()
180            .await;
181
182        assert_eq!(parsed.unwrap(), vec!["", ""]);
183    }
184
185    #[tokio::test]
186    async fn dangling_comma() {
187        let json = "[42 , ]";
188        let stream = futures::stream::once(async { json.bytes() });
189        let parsed: Result<Vec<_>, _> = stream_json_array(stream)
190            .map_err(|err| Box::new(err) as Box<dyn Error>)
191            .and_then(|buffer| {
192                future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
193            })
194            .try_collect()
195            .await;
196
197        assert_eq!(parsed.unwrap(), vec!["42", ""]);
198    }
199}