ndjson_stream/
engine.rs

1//! This module contains the low-level NDJSON parsing logic in the form of the [NdjsonEngine]. You
2//! should usually not have to use this directly, but rather access a higher-level interface such as
3//! iterators.
4
5use std::collections::VecDeque;
6use std::str;
7
8use serde::Deserialize;
9
10use serde_json::error::Result as JsonResult;
11
12use crate::as_bytes::AsBytes;
13use crate::config::{EmptyLineHandling, NdjsonConfig};
14
15fn index_of<T: Eq>(data: &[T], search: T) -> Option<usize> {
16    data.iter().enumerate()
17        .find(|&(_, item)| item == &search)
18        .map(|(index, _)| index)
19}
20
21const NEW_LINE: u8 = b'\n';
22
23/// The low-level engine parsing NDJSON-data given as byte slices into objects of the type parameter
24/// `T`. Data is supplied in chunks and parsed objects can subsequently be read from a queue.
25///
26/// Users of this crate should usually not have to use this struct but rather a higher-level
27/// interface such as iterators.
28pub struct NdjsonEngine<T> {
29    in_queue: Vec<u8>,
30    out_queue: VecDeque<JsonResult<T>>,
31    config: NdjsonConfig
32}
33
34impl<T> NdjsonEngine<T> {
35
36    /// Creates a new NDJSON-engine for objects of the given type parameter with default
37    /// [NdjsonConfig].
38    pub fn new() -> NdjsonEngine<T> {
39        NdjsonEngine::with_config(NdjsonConfig::default())
40    }
41
42    /// Creates a new NDJSON-engine for objects of the given type parameter with the given
43    /// [NdjsonConfig] to control its behavior. See [NdjsonConfig] for more details.
44    pub fn with_config(config: NdjsonConfig) -> NdjsonEngine<T> {
45        NdjsonEngine {
46            in_queue: Vec::new(),
47            out_queue: VecDeque::new(),
48            config
49        }
50    }
51
52    /// Reads the next element from the queue of parsed items, if sufficient NDJSON-data has been
53    /// supplied previously via [NdjsonEngine::input], that is, a newline character has been
54    /// observed. If the input until the newline is not valid JSON, the parse error is returned. If
55    /// no element is available in the queue, `None` is returned.
56    pub fn pop(&mut self) -> Option<JsonResult<T>> {
57        self.out_queue.pop_front()
58    }
59}
60
61fn is_blank(string: &str) -> bool {
62    string.chars().all(char::is_whitespace)
63}
64
65fn parse_line<T>(bytes: &[u8], empty_line_handling: EmptyLineHandling) -> Option<JsonResult<T>>
66where
67    for<'deserialize> T: Deserialize<'deserialize>
68{
69    let should_ignore = match empty_line_handling {
70        EmptyLineHandling::ParseAlways => false,
71        EmptyLineHandling::IgnoreEmpty => bytes.is_empty() || bytes == [b'\r'],
72        EmptyLineHandling::IgnoreBlank => str::from_utf8(bytes).is_ok_and(is_blank)
73    };
74
75    if should_ignore {
76        None
77    }
78    else {
79        Some(serde_json::from_slice(bytes))
80    }
81}
82
83impl<T> NdjsonEngine<T>
84where
85    for<'deserialize> T: Deserialize<'deserialize>
86{
87
88    /// Parses the given data as NDJSON. In case the end does not match up with a newline, the rest
89    /// is stored in an internal cache. Consequently, the rest from a previous call to this method
90    /// is prepended to the given data in case a newline is encountered.
91    pub fn input(&mut self, data: impl AsBytes) {
92        let mut data = data.as_bytes();
93
94        while let Some(newline_idx) = index_of(data, NEW_LINE) {
95            let data_until_split = &data[..newline_idx];
96
97            let next_item_bytes = if self.in_queue.is_empty() {
98                data_until_split
99            }
100            else {
101                self.in_queue.extend_from_slice(data_until_split);
102                &self.in_queue
103            };
104
105            if let Some(item) = parse_line(next_item_bytes, self.config.empty_line_handling) {
106                self.out_queue.push_back(item);
107            }
108
109            self.in_queue.clear();
110            data = &data[(newline_idx + 1)..];
111        }
112
113        self.in_queue.extend_from_slice(data);
114    }
115
116    /// Parses the rest leftover from previous calls to [NdjsonEngine::input], i.e. the data after
117    /// the last given newline character, if all of the following conditions are met.
118    ///
119    /// * The engine uses a config with [NdjsonConfig::with_parse_rest] set to `true`.
120    /// * There is non-empty data left to parse. In other words, the previous provided input did not
121    /// end with a newline character.
122    /// * The rest is not considered empty by the handling configured in
123    /// [NdjsonConfig::with_empty_line_handling]. That is, if the rest consists only of whitespace
124    /// and [EmptyLineHandling::IgnoreBlank] is used, the rest is not parsed.
125    ///
126    /// In any case, the rest is discarded from the input buffer. Therefore, this function is
127    /// idempotent.
128    ///
129    /// Note: This function is intended to be called after the input ended, but there is no
130    /// validation in place to check that [NdjsonEngine::input] is not called afterwards. Doing this
131    /// anyway may lead to unexpected behavior, as JSON-lines may be partially discarded.
132    pub fn finalize(&mut self) {
133        if self.config.parse_rest {
134            let empty_line_handling = match self.config.empty_line_handling {
135                EmptyLineHandling::ParseAlways => EmptyLineHandling::IgnoreEmpty,
136                empty_line_handling => empty_line_handling
137            };
138
139            if let Some(item) = parse_line(&self.in_queue, empty_line_handling) {
140                self.out_queue.push_back(item);
141            }
142        }
143
144        self.in_queue.clear();
145    }
146}
147
148impl<T> Default for NdjsonEngine<T> {
149    fn default() -> NdjsonEngine<T> {
150        NdjsonEngine::new()
151    }
152}
153
154#[cfg(test)]
155mod tests {
156
157    use kernal::prelude::*;
158
159    use serde_json::error::Result as JsonResult;
160
161    use std::borrow::Cow;
162    use std::iter;
163    use std::rc::Rc;
164    use std::sync::Arc;
165    use crate::config::{EmptyLineHandling, NdjsonConfig};
166
167    use crate::engine::NdjsonEngine;
168    use crate::test_util::TestStruct;
169
170    fn collect_output(mut engine: NdjsonEngine<TestStruct>)
171            -> Vec<JsonResult<TestStruct>> {
172        iter::from_fn(|| engine.pop()).collect::<Vec<_>>()
173    }
174
175    #[test]
176    fn no_input() {
177        let engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
178
179        assert_that!(collect_output(engine)).is_empty();
180    }
181
182    #[test]
183    fn incomplete_input() {
184        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
185
186        engine.input("{\"key\":3,\"val");
187
188        assert_that!(collect_output(engine)).is_empty();
189    }
190
191    #[test]
192    fn single_exact_input() {
193        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
194
195        engine.input("{\"key\":3,\"value\":4}\n");
196
197        assert_that!(collect_output(engine))
198            .satisfies_exactly_in_given_order(dyn_assertions!(
199                |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 4 })
200            ));
201    }
202
203    #[test]
204    fn single_item_split_into_two_inputs() {
205        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
206
207        engine.input("{\"key\":42,");
208        engine.input("\"value\":24}\n");
209
210        assert_that!(collect_output(engine))
211            .satisfies_exactly_in_given_order(dyn_assertions!(
212                |it| assert_that!(it).contains_value(TestStruct { key: 42, value: 24 })
213            ));
214    }
215
216    #[test]
217    fn two_items_in_single_input() {
218        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
219
220        engine.input("{\"key\":1,\"value\":1}\n{\"key\":2,\"value\":2}\n");
221
222        assert_that!(collect_output(engine))
223            .satisfies_exactly_in_given_order(dyn_assertions!(
224                |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 1 }),
225                |it| assert_that!(it).contains_value(TestStruct { key: 2, value: 2 })
226            ));
227    }
228
229    #[test]
230    fn two_items_in_many_inputs_with_rest() {
231        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
232
233        engine.input("{\"key\":12,\"v");
234        engine.input("alue\":3");
235        engine.input("4}\n{\"key");
236        engine.input("\":56,\"valu");
237        engine.input("e\":78}\n{\"key\":");
238
239        assert_that!(collect_output(engine))
240            .satisfies_exactly_in_given_order(dyn_assertions!(
241                |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 }),
242                |it| assert_that!(it).contains_value(TestStruct { key: 56, value: 78 })
243            ));
244    }
245
246    #[test]
247    fn input_completing_previous_rest_then_multiple_complete_items_and_more_rest() {
248        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
249
250        engine.input("{\"key\":9,\"value\":");
251        engine.input("8}\n{\"key\":7,\"value\":6}\n{\"key\":5,\"value\":4}\n{\"key\":");
252        engine.input("3,\"value\":2}\n{");
253
254        assert_that!(collect_output(engine))
255            .satisfies_exactly_in_given_order(dyn_assertions!(
256                |it| assert_that!(it).contains_value(TestStruct { key: 9, value: 8 }),
257                |it| assert_that!(it).contains_value(TestStruct { key: 7, value: 6 }),
258                |it| assert_that!(it).contains_value(TestStruct { key: 5, value: 4 }),
259                |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 2 })
260            ));
261    }
262
263    #[test]
264    fn carriage_return_handled_gracefully() {
265        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
266
267        engine.input("{\"key\":1,\"value\":2}\r\n{\"key\":3,\"value\":4}\r\n");
268
269        assert_that!(collect_output(engine))
270            .satisfies_exactly_in_given_order(dyn_assertions!(
271                |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 }),
272                |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 4 })
273            ));
274    }
275
276    #[test]
277    fn whitespace_handled_gracefully() {
278        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
279
280        engine.input("\t{ \"key\":\t13,  \"value\":   37 } \r\n");
281
282        assert_that!(collect_output(engine))
283            .satisfies_exactly_in_given_order(dyn_assertions!(
284                |it| assert_that!(it).contains_value(TestStruct { key: 13, value: 37 })
285            ));
286    }
287
288    #[test]
289    fn erroneous_entry_emitted_as_json_error() {
290        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
291
292        engine.input("{\"key\":1}\n{\"key\":1,\"value\":1}\n");
293
294        assert_that!(collect_output(engine))
295            .satisfies_exactly_in_given_order(dyn_assertions!(
296                |it| assert_that!(it).is_err(),
297                |it| assert_that!(it).is_ok()
298            ));
299    }
300
301    #[test]
302    fn error_from_split_entry() {
303        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
304
305        engine.input("{\"key\":100,\"value\":200}\n{\"key\":");
306        engine.input("\"should be a number\",\"value\":0}\n{\"key\":300,\"value\":400}\n");
307
308        assert_that!(collect_output(engine))
309            .satisfies_exactly_in_given_order(dyn_assertions!(
310                |it| assert_that!(it).contains_value(TestStruct { key: 100, value: 200 }),
311                |it| assert_that!(it).is_err(),
312                |it| assert_that!(it).contains_value(TestStruct { key: 300, value: 400 })
313            ));
314    }
315
316    #[test]
317    fn engine_input_works_for_different_types() {
318        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::default();
319
320        engine.input(b"{\"k");
321        engine.input(b"ey\"".to_vec());
322        engine.input(":12".to_string());
323        engine.input(&mut ",\"v".to_string());
324        engine.input("alu".to_string().into_boxed_str());
325        engine.input(b"e\"".to_vec().into_boxed_slice());
326        engine.input(Arc::<str>::from(":3"));
327        engine.input(Rc::<[u8]>::from(&b"4}"[..]));
328        engine.input(Cow::Borrowed(&b"\r\n".to_vec()));
329
330        assert_that!(collect_output(engine))
331            .satisfies_exactly_in_given_order(dyn_assertions!(
332                |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
333            ));
334    }
335
336    #[test]
337    fn old_data_is_discarded() {
338        let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
339        let count = 20;
340
341        engine.input("{ \"key\": 1, ");
342
343        for _ in 0..(count - 1) {
344            engine.input("\"value\": 2 }\r\n{ \"key\": 1, ");
345        }
346
347        engine.input("\"value\": 2 }\r\n");
348
349        assert_that!(engine.in_queue).is_empty();
350        assert_that!(engine.out_queue).has_length(count);
351    }
352
353    fn configured_engine(configure: impl FnOnce(NdjsonConfig) -> NdjsonConfig)
354            -> NdjsonEngine<TestStruct> {
355        let config = configure(NdjsonConfig::default());
356        NdjsonEngine::with_config(config)
357    }
358
359    fn engine_with_empty_line_handling(empty_line_handling: EmptyLineHandling)
360            -> NdjsonEngine<TestStruct> {
361        configured_engine(|config| config.with_empty_line_handling(empty_line_handling))
362    }
363
364    #[test]
365    fn raises_error_when_parsing_empty_line_in_parse_always_mode() {
366        let mut engine = engine_with_empty_line_handling(EmptyLineHandling::ParseAlways);
367
368        engine.input("{\"key\":1,\"value\":2}\n\n{\"key\":3,\"value\":4}\n");
369
370        assert_that!(collect_output(engine)).contains_elements_matching(Result::is_err);
371    }
372
373    #[test]
374    fn does_not_raise_error_when_parsing_empty_line_in_ignore_empty_mode() {
375        let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
376
377        engine.input("{\"key\":1,\"value\":2}\n\n{\"key\":3,\"value\":4}\n");
378
379        assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
380    }
381
382    #[test]
383    fn does_not_raise_error_when_parsing_empty_line_with_carriage_return_in_ignore_empty_mode() {
384        let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
385
386        engine.input("{\"key\":1,\"value\":2}\r\n\r\n{\"key\":3,\"value\":4}\n");
387
388        assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
389    }
390
391    #[test]
392    fn raises_error_when_parsing_non_empty_blank_line_in_ignore_empty_mode() {
393        let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
394
395        engine.input("{\"key\":1,\"value\":2}\n \t\r\n{\"key\":3,\"value\":4}\n");
396
397        assert_that!(collect_output(engine)).contains_elements_matching(Result::is_err);
398    }
399
400    #[test]
401    fn does_not_raise_error_when_parsing_non_empty_blank_line_in_ignore_blank_mode() {
402        let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
403
404        engine.input("{\"key\":1,\"value\":2}\n \t\r\n{\"key\":3,\"value\":4}\n");
405
406        assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
407    }
408
409    #[test]
410    fn finalize_ignores_rest_if_parse_rest_is_false() {
411        let mut engine = configured_engine(|config| config.with_parse_rest(false));
412
413        engine.input("{\"key\":1,\"value\":2}");
414        engine.finalize();
415
416        assert_that!(collect_output(engine)).is_empty();
417    }
418
419    #[test]
420    fn finalize_parses_valid_rest() {
421        const EMPTY_LINE_HANDLINGS: [EmptyLineHandling; 3] = [
422            EmptyLineHandling::ParseAlways,
423            EmptyLineHandling::IgnoreEmpty,
424            EmptyLineHandling::IgnoreBlank
425        ];
426
427        for empty_line_handling in EMPTY_LINE_HANDLINGS {
428            let mut engine = configured_engine(|config| config
429                .with_empty_line_handling(empty_line_handling)
430                .with_parse_rest(true));
431
432            engine.input("{\"key\":1,\"value\":2}");
433            engine.finalize();
434
435            assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
436                |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
437            ));
438        }
439    }
440
441    #[test]
442    fn finalize_raises_error_on_invalid_rest() {
443        let mut engine = configured_engine(|config| config.with_parse_rest(true));
444
445        engine.input("invalid json");
446        engine.finalize();
447
448        assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
449            |it| assert_that!(it).is_err()
450        ));
451    }
452
453    #[test]
454    fn finalize_ignores_empty_rest_even_if_empty_line_handling_is_parse_always() {
455        let mut engine = configured_engine(|config| config
456            .with_empty_line_handling(EmptyLineHandling::ParseAlways)
457            .with_parse_rest(true));
458
459        engine.finalize();
460
461        assert_that!(collect_output(engine)).is_empty();
462    }
463
464    #[test]
465    fn finalize_ignores_empty_rest_if_empty_line_handling_is_ignore_empty() {
466        let mut engine = configured_engine(|config| config
467            .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty)
468            .with_parse_rest(true));
469
470        engine.finalize();
471
472        assert_that!(collect_output(engine)).is_empty();
473    }
474
475    #[test]
476    fn finalize_does_not_ignore_non_empty_blank_rest_if_empty_line_handling_is_ignore_empty() {
477        let mut engine = configured_engine(|config| config
478            .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty)
479            .with_parse_rest(true));
480
481        engine.input(" ");
482        engine.finalize();
483
484        assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
485            |it| assert_that!(it).is_err()
486        ));
487    }
488
489    #[test]
490    fn finalize_ignores_non_empty_blank_rest_if_empty_line_handling_is_ignore_blank() {
491        let mut engine = configured_engine(|config| config
492            .with_empty_line_handling(EmptyLineHandling::IgnoreBlank)
493            .with_parse_rest(true));
494
495        engine.input(" ");
496        engine.finalize();
497
498        assert_that!(collect_output(engine)).is_empty();
499    }
500
501    #[test]
502    fn finalize_is_idempotent() {
503        let mut engine = configured_engine(|config| config.with_parse_rest(true));
504
505        engine.input("{\"key\":13,\"value\":37}");
506        engine.finalize();
507        engine.finalize();
508
509        assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
510            |it| assert_that!(it).contains_value(TestStruct { key: 13, value: 37 })
511        ));
512    }
513}