http_json_stream/
partial_json.rs

1use serde::de::DeserializeOwned;
2use std::collections::VecDeque;
3use std::fmt;
4use std::io::{Cursor, Read, Write};
5use std::marker::{PhantomData, Unpin};
6
7/// A filter for [`PartialJson`] to determine which items to deserialize.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub struct JsonPart {
10    pub level: u32,
11    pub group: Option<u32>,
12    pub parse_object_values: bool,
13    pub ignore_list_entries: bool,
14}
15
16impl JsonPart {
17    /// The level determines at which depth to deserialize data:
18    /// - `0` - the full JSON is parsed as a single value once [`done`](PartialJson::done) is called.
19    /// - `1` - all entries inside of the root group are deserialized individually.
20    /// - `2` - only entries inside of a group found directly within the root.
21    /// - `3` - ...
22    ///
23    /// A group is an object or a list, both will increase the depth, but entries may be parsed from either or both.
24    pub fn level(level: u32) -> Self {
25        JsonPart {
26            level,
27            group: None,
28            parse_object_values: false,
29            ignore_list_entries: false,
30        }
31    }
32    /// Deserialize only entries found within the group with the given index.
33    ///
34    /// To deserialize only entries inside of the first list found within the root object:
35    /// ```
36    /// # use http_json_stream::JsonPart;
37    /// JsonPart::level(2).group(0);
38    /// ```
39    pub fn group(mut self, index: u32) -> Self {
40        self.group = Some(index);
41        self
42    }
43    /// Parse object values as if they were list entries, ignoring the keys.
44    ///
45    /// This influences the index counted by [`JsonPart::group`], now also counting objects.
46    /// ```
47    /// # use http_json_stream::JsonPart;
48    /// JsonPart::level(2).parse_object_values();
49    /// ```
50    pub fn parse_object_values(mut self) -> Self {
51        self.parse_object_values = true;
52        self
53    }
54    /// Do not parse list entries and use [`JsonPart::parse_object_values`] instead.
55    ///
56    /// This influences the index counted by [`JsonPart::group`], now only counting objects.
57    /// ```
58    /// # use http_json_stream::JsonPart;
59    /// JsonPart::level(2).ignore_list_entries();
60    /// ```
61    pub fn ignore_list_entries(mut self) -> Self {
62        self.ignore_list_entries = true;
63        self.parse_object_values = true;
64        self
65    }
66}
67
68/// A [`Write`] or [`Extend`] interface to push [`JSON`](serde_json) data.
69///
70/// An [`Iterator`] interface to pull [`Deserialized`](serde) data.
71///
72/// ```
73/// # use std::io::Write;
74/// # use std::marker::Unpin;
75/// # use futures_core::Stream;
76/// # use futures_util::StreamExt;
77/// # use serde::de::DeserializeOwned;
78/// # use http_json_stream::{JsonPart, PartialJson};
79/// # fn process_data<T>(_: T) {}
80/// async fn parse_json_stream<S, T>(mut stream: S)
81/// where
82///     S: Stream<Item = Vec<u8>> + Unpin,
83///     T: DeserializeOwned,
84/// {
85///     let mut json = PartialJson::<T>::new(JsonPart::level(1).group(0));
86///     while let Some(chunk) = stream.next().await {
87///         json.extend(&chunk);
88///         while let Some(item) = json.next() {
89///             process_data(item);
90///         }
91///     }
92///     if let Some(item) = json.done() {
93///         process_data(item);
94///     }
95/// }
96/// ```
97pub struct PartialJson<T> {
98    buffer: VecDeque<u8>,
99    part: JsonPart,
100    level: u32,
101    group: u32,
102    item_char: char,
103    last_char: char,
104    in_string: bool,
105    i: usize,
106    phantom: PhantomData<T>,
107}
108
109impl<T> PartialJson<T> {
110    /// Creates a new JSON parser which can deserialize data as it is being written.
111    ///
112    /// The data to be deserialized is determined by the [`JsonPart`] filter.
113    pub fn new(part: JsonPart) -> Self {
114        PartialJson {
115            buffer: VecDeque::new(),
116            part,
117            level: 0,
118            group: 0,
119            item_char: '\0',
120            last_char: '\0',
121            in_string: false,
122            i: 0,
123            phantom: PhantomData,
124        }
125    }
126}
127
128impl<T> Clone for PartialJson<T> {
129    fn clone(&self) -> Self {
130        PartialJson {
131            buffer: self.buffer.clone(),
132            part: self.part,
133            level: self.level,
134            group: self.group,
135            item_char: self.item_char,
136            last_char: self.last_char,
137            in_string: self.in_string,
138            i: self.i,
139            phantom: self.phantom,
140        }
141    }
142    fn clone_from(&mut self, source: &Self) {
143        self.buffer.clone_from(&source.buffer);
144        self.part = source.part;
145        self.level = source.level;
146        self.group = source.group;
147        self.item_char = source.item_char;
148        self.last_char = source.last_char;
149        self.in_string = source.in_string;
150        self.i = source.i;
151        self.phantom = source.phantom;
152    }
153}
154
155unsafe impl<T> Send for PartialJson<T> {}
156unsafe impl<T> Sync for PartialJson<T> {}
157impl<T> Unpin for PartialJson<T> {}
158
159impl<T> fmt::Debug for PartialJson<T> {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        f.debug_tuple("PartialJson").field(&self.buffer).finish()
162    }
163}
164
165impl<T> Write for PartialJson<T> {
166    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
167        self.buffer.extend(buf);
168        Ok(buf.len())
169    }
170    fn flush(&mut self) -> std::io::Result<()> {
171        Ok(())
172    }
173}
174
175impl<T> Extend<u8> for PartialJson<T> {
176    fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I) {
177        self.buffer.extend(iter);
178    }
179}
180
181impl<'a, T> Extend<&'a u8> for PartialJson<T> {
182    fn extend<I: IntoIterator<Item = &'a u8>>(&mut self, iter: I) {
183        self.buffer.extend(iter);
184    }
185}
186
187impl<T: DeserializeOwned> Iterator for PartialJson<T> {
188    type Item = crate::Result<T>;
189    fn next(&mut self) -> Option<Self::Item> {
190        loop {
191            if self.i == self.buffer.len() {
192                if !self.buffer.is_empty() {
193                    let skipping = self.level < self.part.level;
194                    if skipping || self.ignored() {
195                        self.drain();
196                    }
197                }
198                return None;
199            }
200            let char = self.buffer[self.i] as char;
201            self.i += 1;
202            if self.in_string {
203                if char == '"' && self.last_char != '\\' {
204                    self.in_string = false;
205                }
206            } else if char.is_whitespace() {
207                continue;
208            } else {
209                if let '"' = char {
210                    self.in_string = true;
211                } else if let '[' | '{' = char {
212                    self.level += 1;
213                    if self.grounded() {
214                        self.item_char = char;
215                    }
216                } else if let ',' = char {
217                    if self.grounded() && !self.ignored() {
218                        return Some(self.parse(self.i - 1));
219                    }
220                } else if let ']' | '}' = char {
221                    let grounded = self.grounded();
222                    let ignored_index = self.ignored_index();
223                    let ignored_group = self.ignored_group();
224                    self.level = self.level.saturating_sub(1);
225                    if grounded {
226                        if !ignored_group {
227                            self.group += 1;
228                        }
229                        let empty = matches!(self.last_char, '[' | '{');
230                        if !ignored_index && !ignored_group && !empty {
231                            return Some(self.parse(self.i - 1));
232                        }
233                    }
234                }
235                if let '[' | ':' = char {
236                    if self.grounded() {
237                        self.drain();
238                    }
239                }
240            }
241            self.last_char = char;
242        }
243    }
244}
245
246impl<T: DeserializeOwned> PartialJson<T> {
247    /// Does nothing and returns `None` if not currently on the correct [`level`](JsonPart::level).
248    ///
249    /// Otherwise, clears the full buffer and attempts to deserialize an item.
250    ///
251    /// Intended for use once writing is finished and the level might be zero.
252    pub fn done(&mut self) -> Option<crate::Result<T>> {
253        if self.grounded() && !self.ignored() {
254            Some(self.parse(self.buffer.len()))
255        } else {
256            None
257        }
258    }
259}
260
261impl<T: DeserializeOwned> PartialJson<T> {
262    fn parse(&mut self, j: usize) -> crate::Result<T> {
263        let (one, two) = self.buffer.as_slices();
264        let res = {
265            if one.len() < j {
266                let j = j - one.len();
267                serde_json::from_reader(Cursor::new(one).chain(Cursor::new(&two[..j])))
268            } else {
269                serde_json::from_slice(&one[..j])
270            }
271        };
272        let result = res.map_err(|err| {
273            let buf = self.buffer.iter().take(j).copied().collect::<Vec<_>>();
274            crate::Error::Json(err, String::from_utf8(buf))
275        });
276        self.drain();
277        result
278    }
279    #[inline]
280    fn grounded(&self) -> bool {
281        self.level == self.part.level
282    }
283    #[inline]
284    fn ignored(&self) -> bool {
285        self.ignored_index() || self.ignored_group()
286    }
287    #[inline]
288    fn ignored_index(&self) -> bool {
289        self.part.group.is_some_and(|group| group != self.group)
290    }
291    #[inline]
292    fn ignored_group(&self) -> bool {
293        let ignored_object = !self.part.parse_object_values && self.item_char == '{';
294        let ignored_list = self.part.ignore_list_entries && self.item_char == '[';
295        ignored_object || ignored_list
296    }
297    #[inline]
298    fn drain(&mut self) {
299        self.buffer.drain(..self.i);
300        self.i = 0;
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use serde::de::DeserializeOwned;
308    use serde::Deserialize;
309    use std::fmt::Debug;
310
311    fn lvl(lvl: u32) -> JsonPart {
312        JsonPart::level(lvl).parse_object_values()
313    }
314
315    #[track_caller]
316    fn run<T>(part: JsonPart, str: &str, obj: &[T])
317    where
318        T: Debug + PartialEq + Eq + DeserializeOwned,
319    {
320        for len in 1..=str.len() {
321            let mut res = Vec::new();
322            let mut json = PartialJson::<T>::new(part);
323            for chunk in str.as_bytes().chunks(len) {
324                json.extend(chunk);
325                while let Some(item) = json.next() {
326                    res.push(item.unwrap());
327                }
328            }
329            if let Some(item) = json.done() {
330                res.push(item.unwrap());
331            }
332            assert_eq!(res, obj);
333        }
334    }
335
336    #[test]
337    fn empty() {
338        #[derive(Debug, Deserialize, PartialEq, Eq)]
339        struct Empty {}
340        run::<()>(lvl(1), "{\n }", &[]);
341        run::<()>(lvl(1), "[ \n]", &[]);
342        run::<Empty>(lvl(0), "{ \n}", &[Empty {}]);
343        run::<Vec<()>>(lvl(0), "[\n ]", &[vec![]]);
344        run::<Vec<()>>(lvl(1).group(0), "{ \"\": [ \n] }", &[vec![]]);
345        run::<Vec<()>>(JsonPart::level(1), "{ \"\": [ \n] }", &[]);
346        run::<Vec<()>>(lvl(1).group(1), "{ \"\": [ \n] }", &[]);
347        run::<()>(lvl(2), "{ \"\": [ \n] }", &[]);
348    }
349
350    #[test]
351    fn simple() {
352        let list = "[[ ],[1,2, 3], [], [4,5], []]";
353        let map = r#"{ "\"":[1,2],"[":[3, 4,5]}"#;
354        run(lvl(2), list, &[1, 2, 3, 4, 5]);
355        run(lvl(2), map, &[1, 2, 3, 4, 5]);
356        run(
357            JsonPart::level(1),
358            list,
359            &[vec![], vec![1, 2, 3], vec![], vec![4, 5], vec![]],
360        );
361        run(
362            JsonPart::level(1).ignore_list_entries(),
363            map,
364            &[vec![1, 2], vec![3, 4, 5]],
365        );
366        run(lvl(2).group(3), list, &[4, 5]);
367        run(lvl(2).group(1), map, &[3, 4, 5]);
368        run::<()>(JsonPart::level(2).ignore_list_entries(), list, &[]);
369    }
370
371    #[test]
372    fn objects() {
373        #[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
374        struct Item {
375            a: String,
376            b: Vec<u32>,
377        }
378        let json = r#"[
379            {"list": [
380                { "b": [3, 4], "a": "test2"},
381                { "a": "test", "b": [1, 2]}
382            ]},
383            {"one": {
384                "a": { "b": [1, 2], "a": "test"}
385            ],
386            "two": [
387                { "a": "test2", "b": [3, 4]}
388            ]},
389            {"more": {
390                "b": { "b": [3, 4], "a": "test2"},
391                "c": { "a": "test", "b": [1, 2]}
392            }}
393        ]"#;
394        let one = Item {
395            a: "test".into(),
396            b: vec![1, 2],
397        };
398        let two = Item {
399            a: "test2".into(),
400            b: vec![3, 4],
401        };
402        run(
403            lvl(3),
404            json,
405            &[
406                two.clone(),
407                one.clone(),
408                one.clone(),
409                two.clone(),
410                two.clone(),
411                one.clone(),
412            ],
413        );
414        run(
415            JsonPart::level(3),
416            json,
417            &[two.clone(), one.clone(), two.clone()],
418        );
419        run(
420            JsonPart::level(3).ignore_list_entries(),
421            json,
422            &[one.clone(), two.clone(), one.clone()],
423        );
424        run(
425            JsonPart::level(3).ignore_list_entries().group(1),
426            json,
427            &[two.clone(), one.clone()],
428        );
429        run(JsonPart::level(3).group(1), json, &[two.clone()]);
430        run(lvl(3).group(0), json, &[two.clone(), one.clone()]);
431        run(lvl(3).group(1), json, &[one.clone()]);
432        run(lvl(3).group(2), json, &[two.clone()]);
433        run(lvl(3).group(3), json, &[two.clone(), one.clone()]);
434        run::<Item>(lvl(3).group(4), json, &[]);
435        let none = JsonPart {
436            level: 3,
437            group: None,
438            ignore_list_entries: true,
439            parse_object_values: false,
440        };
441        run::<Item>(none, json, &[]);
442    }
443
444    #[test]
445    fn utf8() {
446        run(
447            lvl(2),
448            r#"{
449                "ⓟH񠭇򓊦򤄛⅃ތfNjͨ񣧺r抄ݼ춓\"ĠQ𭌔\󛞼ܫ􂷳쯪􁺬ڹދ􅽘͹៦9ު": [
450                    "󸈣ظȗ򷇪\"𐏴駭󳷈맪ى퇄<}i񩇦򭍁ި𥏡p;0+􆨧ͅд򔦜j솞U_.ߤb",
451                    ":˚�梔񂌄֪Ͻv񴷤Ẑˆ򲬧D񔰾衸򊻤LY�\"䋪ω`񱰲󞰅怟񤹝zw󐭄#ͦ"
452                ],
453                "𬻈;:䓄$ۺ䰁%୯NYꊠ|㦴⢗$X㴩߉񫤵؊񇢏\"9׳󯠀ߊvƉA񾩲ҁ֋": "릪𖲨ѯ⦤͛𞊎͐4륛𪮫Θ񈐦𐃇\"򕘾򚂼澀񿡬ꖆߐͤwߥ󉙥뉧ƕ󈀁[ɟϏ叓D񧿾",
454                "瑔샂,󂦆򤍟򕤕͘꯯ϒނ\"𖄪𽞲𼤊ب𔽢ʌ鋂BϑÂ򗴎󹜦瑜񅃼󁲫봗Lk󊣧ԣ旧": [
455                    "Z̩ťMɲ髲򒝪񠛱ቱ̗桒\"𾤄W𴊖􀕡򰅼돭_񒐽񻔪䍙Ғѿ򛁬B6ݍ𤦐鄎^Eˍ",
456                    "E넰׻𨆣{2ܗ鞬識򋫄󶁇촗'㟳󴧁)#Ş\"ߟ൴򘽍󾥘ڋ⸏ŗ𣻡𝝙ݡi斏ᾝ՞"
457                ]
458            }"#,
459            &[
460                "󸈣ظȗ򷇪\"𐏴駭󳷈맪ى퇄<}i񩇦򭍁ި𥏡p;0+􆨧ͅд򔦜j솞U_.ߤb".to_string(),
461                ":˚�梔񂌄֪Ͻv񴷤Ẑˆ򲬧D񔰾衸򊻤LY�\"䋪ω`񱰲󞰅怟񤹝zw󐭄#ͦ".to_string(),
462                "Z̩ťMɲ髲򒝪񠛱ቱ̗桒\"𾤄W𴊖􀕡򰅼돭_񒐽񻔪䍙Ғѿ򛁬B6ݍ𤦐鄎^Eˍ".to_string(),
463                "E넰׻𨆣{2ܗ鞬識򋫄󶁇촗'㟳󴧁)#Ş\"ߟ൴򘽍󾥘ڋ⸏ŗ𣻡𝝙ݡi斏ᾝ՞".to_string(),
464            ],
465        )
466    }
467}