json_array_stream/
json_array_stream.rs1use futures::{stream::Stream, task::Context};
2use std::pin::Pin;
3use std::task::Poll;
4use thiserror::Error;
5
6use super::json_depth_analyzer;
7
8#[derive(Error, Debug)]
9pub enum JsonStreamError {
10 #[error(transparent)]
11 Io(#[from] std::io::Error),
12 #[error("invalid syntax")]
13 Parser(#[from] json_depth_analyzer::ParserError),
14 #[error("invalid json")]
15 Json(#[from] serde_json::error::Error),
16}
17
18pub struct JsonArrayStream<S, B>
19where
20 S: Stream<Item = B>,
21 B: IntoIterator<Item = u8> + Sized,
22{
23 analyzer: json_depth_analyzer::JsonDepthAnalyzer,
24 buffer: Vec<u8>,
25 stream: Pin<Box<S>>,
26 chunk: Option<B::IntoIter>,
27 comma: bool,
28 end: bool,
29}
30
31impl<S, B> Stream for JsonArrayStream<S, B>
32where
33 S: Stream<Item = B>,
34 B: IntoIterator<Item = u8> + Sized,
35{
36 type Item = Result<Vec<u8>, JsonStreamError>;
37
38 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
39 let this = unsafe { self.get_unchecked_mut() };
40 if this.end {
41 return Poll::Ready(None);
42 }
43
44 loop {
45 if let Some(chunk) = this.chunk.as_mut() {
46 for c in chunk {
47 let initial_depth = this.analyzer.depth();
48
49 this.analyzer
50 .process(c)
51 .map_err(|err| JsonStreamError::from(err))?;
52
53 if initial_depth == 0 {
54 continue;
55 }
56
57 let emit = if initial_depth == 1 && c == b',' {
58 this.comma = true;
59 true
60 } else if initial_depth == 1 && (c as char).is_whitespace() {
61 false
62 } else if this.analyzer.depth() == 0 {
63 this.end = true;
64 true
65 } else {
66 this.buffer.push(c);
67 false
68 };
69
70 if emit {
71 if this.buffer.len() == 0 && !this.comma {
72 return Poll::Ready(None);
73 }
74
75 let mut empty = vec![];
76 std::mem::swap(&mut empty, &mut this.buffer);
77 return Poll::Ready(Some(Ok(empty)));
78 }
79 }
80 this.chunk = None;
81 }
82
83 match this.stream.as_mut().poll_next(cx) {
84 Poll::Ready(None) => {
85 return Poll::Ready(Some(Err(JsonStreamError::from(std::io::Error::new(
86 std::io::ErrorKind::UnexpectedEof,
87 "preliminary EOF when parsing json array",
88 )))));
89 }
90 Poll::Pending => {
91 return Poll::Pending;
92 }
93 Poll::Ready(Some(chunk)) => {
94 this.chunk = Some(chunk.into_iter());
95 }
96 }
97 }
98 }
99}
100
101pub fn stream_json_array<S, B>(stream: S) -> JsonArrayStream<S, B>
102where
103 S: Stream<Item = B>,
104 B: IntoIterator<Item = u8> + Sized,
105{
106 JsonArrayStream {
107 stream: Box::pin(stream),
108 analyzer: json_depth_analyzer::JsonDepthAnalyzer::new(),
109 buffer: vec![],
110 chunk: None,
111 comma: false,
112 end: false,
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::*;
119 use futures::prelude::*;
120 use std::error::Error;
121
122 #[tokio::test]
123 async fn empty_array() {
124 let json = "[]";
125 let stream = futures::stream::once(async { json.bytes() });
126 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
127 .map_err(|err| Box::new(err) as Box<dyn Error>)
128 .and_then(move |buffer| {
129 future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
130 })
131 .try_collect()
132 .await;
133
134 assert_eq!(parsed.unwrap(), vec![] as Vec<&str>);
135 }
136
137 #[tokio::test]
138 async fn single_value() {
139 let json = "[12]";
140 let stream = futures::stream::once(async { json.bytes() });
141 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
142 .map_err(|err| Box::new(err) as Box<dyn Error>)
143 .and_then(|buffer| {
144 future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
145 })
146 .try_collect()
147 .await;
148
149 assert_eq!(parsed.unwrap(), vec!["12"]);
150 }
151
152 #[tokio::test]
153 async fn multiple_values() {
154 let json = "[\"blubb\", 42,{\"xxx\":false , \"yyy\":\"abc\"} ] ";
155 let stream = futures::stream::once(async { json.bytes() });
156 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
157 .map_err(|err| Box::new(err) as Box<dyn Error>)
158 .and_then(|buffer| {
159 future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
160 })
161 .try_collect()
162 .await;
163
164 assert_eq!(
165 parsed.unwrap(),
166 vec!["\"blubb\"", "42", "{\"xxx\":false , \"yyy\":\"abc\"}"]
167 );
168 }
169
170 #[tokio::test]
171 async fn comma_without_values() {
172 let json = "[,]";
173 let stream = futures::stream::once(async { json.bytes() });
174 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
175 .map_err(|err| Box::new(err) as Box<dyn Error>)
176 .and_then(|buffer| {
177 future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
178 })
179 .try_collect()
180 .await;
181
182 assert_eq!(parsed.unwrap(), vec!["", ""]);
183 }
184
185 #[tokio::test]
186 async fn dangling_comma() {
187 let json = "[42 , ]";
188 let stream = futures::stream::once(async { json.bytes() });
189 let parsed: Result<Vec<_>, _> = stream_json_array(stream)
190 .map_err(|err| Box::new(err) as Box<dyn Error>)
191 .and_then(|buffer| {
192 future::ready(String::from_utf8(buffer).map_err(|err| Box::new(err).into()))
193 })
194 .try_collect()
195 .await;
196
197 assert_eq!(parsed.unwrap(), vec!["42", ""]);
198 }
199}