http_json_stream/
partial_json.rs

1use serde::de::DeserializeOwned;
2use std::collections::VecDeque;
3use std::fmt;
4use std::io::{Cursor, Read, Write};
5use std::marker::{PhantomData, Unpin};
6
7/// A filter for [`PartialJson`] to determine which items to deserialize.
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub struct JsonPart {
10    pub level: u32,
11    pub group: Option<u32>,
12    pub parse_object_values: bool,
13    pub ignore_list_entries: bool,
14}
15
16impl JsonPart {
17    /// The level determines at which depth to deserialize data:
18    /// - `0` - the full JSON is parsed as a single value once [`done`](PartialJson::done) is called.
19    /// - `1` - all entries inside of the root group are deserialized individually.
20    /// - `2` - only entries inside of a group found directly within the root.
21    /// - `3` - ...
22    ///
23    /// A group is an object or a list, both will increase the depth, but entries may be parsed from either or both.
24    pub fn level(level: u32) -> Self {
25        JsonPart {
26            level,
27            group: None,
28            parse_object_values: false,
29            ignore_list_entries: false,
30        }
31    }
32    /// Deserialize only entries found within the group with the given index.
33    ///
34    /// To deserialize only entries inside of the first list found within the root object:
35    /// ```
36    /// # use http_json_stream::JsonPart;
37    /// JsonPart::level(2).group(0);
38    /// ```
39    pub fn group(mut self, index: u32) -> Self {
40        self.group = Some(index);
41        self
42    }
43    /// Parse object values as if they were list entries, ignoring the keys.
44    ///
45    /// This influences the index counted by [`group`](JsonPart::group), now also counting objects.
46    /// ```
47    /// # use http_json_stream::JsonPart;
48    /// JsonPart::level(2).parse_object_values();
49    /// ```
50    pub fn parse_object_values(mut self) -> Self {
51        self.parse_object_values = true;
52        self
53    }
54    /// Do not parse list entries and use [`parse_object_values`](JsonPart::parse_object_values) instead.
55    ///
56    /// This influences the index counted by [`group`](JsonPart::group), now only counting objects.
57    /// ```
58    /// # use http_json_stream::JsonPart;
59    /// JsonPart::level(2).ignore_list_entries();
60    /// ```
61    pub fn ignore_list_entries(mut self) -> Self {
62        self.ignore_list_entries = true;
63        self.parse_object_values = true;
64        self
65    }
66}
67
68/// A [`Write`] or [`Extend`] interface to push [`JSON`](serde_json) data.
69///
70/// An [`Iterator`] interface to pull [`Deserialized`](serde) data.
71///
72/// ```
73/// # use std::io::Write;
74/// # use std::marker::Unpin;
75/// # use futures_core::Stream;
76/// # use futures_util::StreamExt;
77/// # use serde::de::DeserializeOwned;
78/// # use http_json_stream::{JsonPart, PartialJson};
79/// # fn process_data<T>(_: T) {}
80/// async fn parse_json_stream<S, T>(mut stream: S)
81/// where
82///     S: Stream<Item = Vec<u8>> + Unpin,
83///     T: DeserializeOwned,
84/// {
85///     let mut json = PartialJson::<T>::new(JsonPart::level(1).group(0));
86///     while let Some(chunk) = stream.next().await {
87///         json.extend(&chunk);
88///         while let Some(item) = json.next() {
89///             process_data(item);
90///         }
91///     }
92///     if let Some(item) = json.done() {
93///         process_data(item);
94///     }
95/// }
96/// ```
97pub struct PartialJson<T> {
98    buffer: VecDeque<u8>,
99    part: JsonPart,
100    level: u32,
101    group: u32,
102    item_char: char,
103    last_char: char,
104    in_string: bool,
105    i: usize,
106    phantom: PhantomData<T>,
107}
108
109impl<T> PartialJson<T> {
110    /// Creates a new JSON parser which can deserialize data as it is being written.
111    ///
112    /// The data to be deserialized is determined by the [`JsonPart`] filter.
113    pub fn new(part: JsonPart) -> Self {
114        PartialJson {
115            buffer: VecDeque::new(),
116            part,
117            level: 0,
118            group: 0,
119            item_char: '\0',
120            last_char: '\0',
121            in_string: false,
122            i: 0,
123            phantom: PhantomData,
124        }
125    }
126}
127
128impl<T> Clone for PartialJson<T> {
129    fn clone(&self) -> Self {
130        PartialJson {
131            buffer: self.buffer.clone(),
132            part: self.part,
133            level: self.level,
134            group: self.group,
135            item_char: self.item_char,
136            last_char: self.last_char,
137            in_string: self.in_string,
138            i: self.i,
139            phantom: self.phantom,
140        }
141    }
142    fn clone_from(&mut self, source: &Self) {
143        self.buffer.clone_from(&source.buffer);
144        self.part = source.part;
145        self.level = source.level;
146        self.group = source.group;
147        self.item_char = source.item_char;
148        self.last_char = source.last_char;
149        self.in_string = source.in_string;
150        self.i = source.i;
151        self.phantom = source.phantom;
152    }
153}
154
155unsafe impl<T> Send for PartialJson<T> {}
156unsafe impl<T> Sync for PartialJson<T> {}
157impl<T> Unpin for PartialJson<T> {}
158
159impl<T> fmt::Debug for PartialJson<T> {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        f.debug_tuple("PartialJson").field(&self.buffer).finish()
162    }
163}
164
165impl<T> Write for PartialJson<T> {
166    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
167        self.buffer.extend(buf);
168        Ok(buf.len())
169    }
170    fn flush(&mut self) -> std::io::Result<()> {
171        Ok(())
172    }
173}
174
175impl<T> Extend<u8> for PartialJson<T> {
176    fn extend<I: IntoIterator<Item = u8>>(&mut self, iter: I) {
177        self.buffer.extend(iter);
178    }
179}
180
181impl<'a, T> Extend<&'a u8> for PartialJson<T> {
182    fn extend<I: IntoIterator<Item = &'a u8>>(&mut self, iter: I) {
183        self.buffer.extend(iter);
184    }
185}
186
187impl<T: DeserializeOwned> Iterator for PartialJson<T> {
188    type Item = crate::Result<T>;
189    fn next(&mut self) -> Option<Self::Item> {
190        loop {
191            if self.i == self.buffer.len() {
192                if !self.buffer.is_empty() {
193                    let skipping = self.level < self.part.level;
194                    if skipping || self.ignored() {
195                        self.drain();
196                    }
197                }
198                return None;
199            }
200            let char = self.buffer[self.i] as char;
201            self.i += 1;
202            if self.in_string {
203                if self.last_char == '\\' {
204                    self.last_char = '\0';
205                } else {
206                    if char == '"' {
207                        self.in_string = false;
208                    }
209                    self.last_char = char;
210                }
211            } else if !char.is_ascii_whitespace() {
212                if let '"' = char {
213                    self.in_string = true;
214                } else if let '[' | '{' = char {
215                    self.level += 1;
216                    if self.grounded() {
217                        self.item_char = char;
218                    }
219                } else if let ',' = char {
220                    if self.grounded() && !self.ignored() {
221                        return Some(self.parse(self.i - 1));
222                    }
223                } else if let ']' | '}' = char {
224                    let grounded = self.grounded();
225                    let ignored_index = self.ignored_index();
226                    let ignored_group = self.ignored_group();
227                    self.level = self.level.saturating_sub(1);
228                    if grounded {
229                        if !ignored_group {
230                            self.group += 1;
231                        }
232                        let empty = matches!(self.last_char, '[' | '{');
233                        if !ignored_index && !ignored_group && !empty {
234                            return Some(self.parse(self.i - 1));
235                        }
236                    }
237                }
238                if let '[' | ':' = char {
239                    if self.grounded() {
240                        self.drain();
241                    }
242                }
243                self.last_char = char;
244            }
245        }
246    }
247}
248
249impl<T: DeserializeOwned> PartialJson<T> {
250    /// Does nothing and returns `None` if not currently on the correct [`level`](JsonPart::level).
251    ///
252    /// Otherwise, clears the full buffer and attempts to deserialize an item.
253    ///
254    /// Intended for use once writing is finished and the level might be zero.
255    pub fn done(&mut self) -> Option<crate::Result<T>> {
256        if self.grounded() && !self.ignored() {
257            Some(self.parse(self.buffer.len()))
258        } else {
259            None
260        }
261    }
262}
263
264impl<T: DeserializeOwned> PartialJson<T> {
265    fn parse(&mut self, j: usize) -> crate::Result<T> {
266        let (one, two) = self.buffer.as_slices();
267        let res = {
268            if one.len() < j {
269                let j = j - one.len();
270                serde_json::from_reader(Cursor::new(one).chain(Cursor::new(&two[..j])))
271            } else {
272                serde_json::from_slice(&one[..j])
273            }
274        };
275        let result = res.map_err(|err| {
276            let buf = self.buffer.iter().take(j).copied().collect::<Vec<_>>();
277            crate::Error::Json(err, String::from_utf8(buf))
278        });
279        self.drain();
280        result
281    }
282    #[inline]
283    fn grounded(&self) -> bool {
284        self.level == self.part.level
285    }
286    #[inline]
287    fn ignored(&self) -> bool {
288        self.ignored_index() || self.ignored_group()
289    }
290    #[inline]
291    fn ignored_index(&self) -> bool {
292        self.part.group.is_some_and(|group| group != self.group)
293    }
294    #[inline]
295    fn ignored_group(&self) -> bool {
296        let ignored_object = !self.part.parse_object_values && self.item_char == '{';
297        let ignored_list = self.part.ignore_list_entries && self.item_char == '[';
298        ignored_object || ignored_list
299    }
300    #[inline]
301    fn drain(&mut self) {
302        self.buffer.drain(..self.i);
303        self.i = 0;
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use serde::de::DeserializeOwned;
311    use serde::Deserialize;
312    use std::fmt::Debug;
313
314    fn lvl(lvl: u32) -> JsonPart {
315        JsonPart::level(lvl).parse_object_values()
316    }
317
318    #[track_caller]
319    fn run<T>(part: JsonPart, str: &str, obj: &[T])
320    where
321        T: Debug + PartialEq + Eq + DeserializeOwned,
322    {
323        for len in 1..=str.len() {
324            let mut res = Vec::new();
325            let mut json = PartialJson::<T>::new(part);
326            for chunk in str.as_bytes().chunks(len) {
327                json.extend(chunk);
328                while let Some(item) = json.next() {
329                    res.push(item.unwrap());
330                }
331            }
332            if let Some(item) = json.done() {
333                res.push(item.unwrap());
334            }
335            assert_eq!(res, obj);
336        }
337    }
338
339    #[test]
340    fn empty() {
341        #[derive(Debug, Deserialize, PartialEq, Eq)]
342        struct Empty {}
343        run::<()>(lvl(1), "{\n }", &[]);
344        run::<()>(lvl(1), "[ \n]", &[]);
345        run::<Empty>(lvl(0), "{ \n}", &[Empty {}]);
346        run::<Vec<()>>(lvl(0), "[\n ]", &[vec![]]);
347        run::<Vec<()>>(lvl(1).group(0), "{ \"\": [ \n] }", &[vec![]]);
348        run::<Vec<()>>(JsonPart::level(1), "{ \"\": [ \n] }", &[]);
349        run::<Vec<()>>(lvl(1).group(1), "{ \"\": [ \n] }", &[]);
350        run::<()>(lvl(2), "{ \"\": [ \n] }", &[]);
351    }
352
353    #[test]
354    fn simple() {
355        let list = "[[ ],[1,2, 3], [], [4,5], []]";
356        let map = r#"{ "\"":[1,2],"[":[3, 4,5]}"#;
357        run(lvl(2), list, &[1, 2, 3, 4, 5]);
358        run(lvl(2), map, &[1, 2, 3, 4, 5]);
359        run(
360            JsonPart::level(1),
361            list,
362            &[vec![], vec![1, 2, 3], vec![], vec![4, 5], vec![]],
363        );
364        run(
365            JsonPart::level(1).ignore_list_entries(),
366            map,
367            &[vec![1, 2], vec![3, 4, 5]],
368        );
369        run(lvl(2).group(3), list, &[4, 5]);
370        run(lvl(2).group(1), map, &[3, 4, 5]);
371        run::<()>(JsonPart::level(2).ignore_list_entries(), list, &[]);
372    }
373
374    #[test]
375    fn objects() {
376        #[derive(Debug, Deserialize, PartialEq, Eq, Clone)]
377        struct Item {
378            a: String,
379            b: Vec<u32>,
380        }
381        let json = r#"[
382            {"list": [
383                { "b": [3, 4], "a": "test2"},
384                { "a": "test", "b": [1, 2]}
385            ]},
386            {"one": {
387                "a": { "b": [1, 2], "a": "test"}
388            ],
389            "two": [
390                { "a": "test2", "b": [3, 4]}
391            ]},
392            {"more": {
393                "b": { "b": [3, 4], "a": "test2"},
394                "c": { "a": "test", "b": [1, 2]}
395            }}
396        ]"#;
397        let one = Item {
398            a: "test".into(),
399            b: vec![1, 2],
400        };
401        let two = Item {
402            a: "test2".into(),
403            b: vec![3, 4],
404        };
405        run(
406            lvl(3),
407            json,
408            &[
409                two.clone(),
410                one.clone(),
411                one.clone(),
412                two.clone(),
413                two.clone(),
414                one.clone(),
415            ],
416        );
417        run(
418            JsonPart::level(3),
419            json,
420            &[two.clone(), one.clone(), two.clone()],
421        );
422        run(
423            JsonPart::level(3).ignore_list_entries(),
424            json,
425            &[one.clone(), two.clone(), one.clone()],
426        );
427        run(
428            JsonPart::level(3).ignore_list_entries().group(1),
429            json,
430            &[two.clone(), one.clone()],
431        );
432        run(JsonPart::level(3).group(1), json, &[two.clone()]);
433        run(lvl(3).group(0), json, &[two.clone(), one.clone()]);
434        run(lvl(3).group(1), json, &[one.clone()]);
435        run(lvl(3).group(2), json, &[two.clone()]);
436        run(lvl(3).group(3), json, &[two.clone(), one.clone()]);
437        run::<Item>(lvl(3).group(4), json, &[]);
438        let none = JsonPart {
439            level: 3,
440            group: None,
441            ignore_list_entries: true,
442            parse_object_values: false,
443        };
444        run::<Item>(none, json, &[]);
445    }
446
447    #[test]
448    fn escape() {
449        run(lvl(1), r#"["\\"]"#, &["\\".to_string()]);
450        run(lvl(1), r#"["\\\", ", "\\"]"#, &["\\\", ".to_string(), "\\".to_string()]);
451        run(lvl(1), r#"["\\\n", ", \\", "\\n"]"#, &["\\\n".to_string(), ", \\".to_string(), "\\n".to_string()]);
452    }
453
454    #[test]
455    fn utf8() {
456        run(
457            lvl(2),
458            r#"{
459                "ⓟH񠭇򓊦򤄛⅃ތfNjͨ񣧺r抄ݼ춓\"ĠQ𭌔\󛞼ܫ􂷳쯪􁺬ڹދ􅽘͹៦9ު": [
460                    "󸈣ظȗ򷇪\"𐏴駭󳷈맪ى퇄<}i񩇦򭍁ި𥏡p;0+􆨧ͅд򔦜j솞U_.ߤb",
461                    ":˚�梔񂌄֪Ͻv񴷤Ẑˆ򲬧D񔰾衸򊻤LY�\"䋪ω`񱰲󞰅怟񤹝zw󐭄#ͦ"
462                ],
463                "𬻈;:䓄$ۺ䰁%୯NYꊠ|㦴⢗$X㴩߉񫤵؊񇢏\"9׳󯠀ߊvƉA񾩲ҁ֋": "릪𖲨ѯ⦤͛𞊎͐4륛𪮫Θ񈐦𐃇\"򕘾򚂼澀񿡬ꖆߐͤwߥ󉙥뉧ƕ󈀁[ɟϏ叓D񧿾",
464                "瑔샂,󂦆򤍟򕤕͘꯯ϒނ\"𖄪𽞲𼤊ب𔽢ʌ鋂BϑÂ򗴎󹜦瑜񅃼󁲫봗Lk󊣧ԣ旧": [
465                    "Z̩ťMɲ髲򒝪񠛱ቱ̗桒\"𾤄W𴊖􀕡򰅼돭_񒐽񻔪䍙Ғѿ򛁬B6ݍ𤦐鄎^Eˍ",
466                    "E넰׻𨆣{2ܗ鞬識򋫄󶁇촗'㟳󴧁)#Ş\"ߟ൴򘽍󾥘ڋ⸏ŗ𣻡𝝙ݡi斏ᾝ՞"
467                ]
468            }"#,
469            &[
470                "󸈣ظȗ򷇪\"𐏴駭󳷈맪ى퇄<}i񩇦򭍁ި𥏡p;0+􆨧ͅд򔦜j솞U_.ߤb".to_string(),
471                ":˚�梔񂌄֪Ͻv񴷤Ẑˆ򲬧D񔰾衸򊻤LY�\"䋪ω`񱰲󞰅怟񤹝zw󐭄#ͦ".to_string(),
472                "Z̩ťMɲ髲򒝪񠛱ቱ̗桒\"𾤄W𴊖􀕡򰅼돭_񒐽񻔪䍙Ғѿ򛁬B6ݍ𤦐鄎^Eˍ".to_string(),
473                "E넰׻𨆣{2ܗ鞬識򋫄󶁇촗'㟳󴧁)#Ş\"ߟ൴򘽍󾥘ڋ⸏ŗ𣻡𝝙ݡi斏ᾝ՞".to_string(),
474            ],
475        )
476    }
477}