1use std::collections::VecDeque;
6use std::str;
7
8use serde::Deserialize;
9
10use serde_json::error::Result as JsonResult;
11
12use crate::as_bytes::AsBytes;
13use crate::config::{EmptyLineHandling, NdjsonConfig};
14
15fn index_of<T: Eq>(data: &[T], search: T) -> Option<usize> {
16 data.iter().enumerate()
17 .find(|&(_, item)| item == &search)
18 .map(|(index, _)| index)
19}
20
21const NEW_LINE: u8 = b'\n';
22
23pub struct NdjsonEngine<T> {
29 in_queue: Vec<u8>,
30 out_queue: VecDeque<JsonResult<T>>,
31 config: NdjsonConfig
32}
33
34impl<T> NdjsonEngine<T> {
35
36 pub fn new() -> NdjsonEngine<T> {
39 NdjsonEngine::with_config(NdjsonConfig::default())
40 }
41
42 pub fn with_config(config: NdjsonConfig) -> NdjsonEngine<T> {
45 NdjsonEngine {
46 in_queue: Vec::new(),
47 out_queue: VecDeque::new(),
48 config
49 }
50 }
51
52 pub fn pop(&mut self) -> Option<JsonResult<T>> {
57 self.out_queue.pop_front()
58 }
59}
60
61fn is_blank(string: &str) -> bool {
62 string.chars().all(char::is_whitespace)
63}
64
65fn parse_line<T>(bytes: &[u8], empty_line_handling: EmptyLineHandling) -> Option<JsonResult<T>>
66where
67 for<'deserialize> T: Deserialize<'deserialize>
68{
69 let should_ignore = match empty_line_handling {
70 EmptyLineHandling::ParseAlways => false,
71 EmptyLineHandling::IgnoreEmpty => bytes.is_empty() || bytes == [b'\r'],
72 EmptyLineHandling::IgnoreBlank => str::from_utf8(bytes).is_ok_and(is_blank)
73 };
74
75 if should_ignore {
76 None
77 }
78 else {
79 Some(serde_json::from_slice(bytes))
80 }
81}
82
83impl<T> NdjsonEngine<T>
84where
85 for<'deserialize> T: Deserialize<'deserialize>
86{
87
88 pub fn input(&mut self, data: impl AsBytes) {
92 let mut data = data.as_bytes();
93
94 while let Some(newline_idx) = index_of(data, NEW_LINE) {
95 let data_until_split = &data[..newline_idx];
96
97 let next_item_bytes = if self.in_queue.is_empty() {
98 data_until_split
99 }
100 else {
101 self.in_queue.extend_from_slice(data_until_split);
102 &self.in_queue
103 };
104
105 if let Some(item) = parse_line(next_item_bytes, self.config.empty_line_handling) {
106 self.out_queue.push_back(item);
107 }
108
109 self.in_queue.clear();
110 data = &data[(newline_idx + 1)..];
111 }
112
113 self.in_queue.extend_from_slice(data);
114 }
115
116 pub fn finalize(&mut self) {
133 if self.config.parse_rest {
134 let empty_line_handling = match self.config.empty_line_handling {
135 EmptyLineHandling::ParseAlways => EmptyLineHandling::IgnoreEmpty,
136 empty_line_handling => empty_line_handling
137 };
138
139 if let Some(item) = parse_line(&self.in_queue, empty_line_handling) {
140 self.out_queue.push_back(item);
141 }
142 }
143
144 self.in_queue.clear();
145 }
146}
147
148impl<T> Default for NdjsonEngine<T> {
149 fn default() -> NdjsonEngine<T> {
150 NdjsonEngine::new()
151 }
152}
153
154#[cfg(test)]
155mod tests {
156
157 use kernal::prelude::*;
158
159 use serde_json::error::Result as JsonResult;
160
161 use std::borrow::Cow;
162 use std::iter;
163 use std::rc::Rc;
164 use std::sync::Arc;
165 use crate::config::{EmptyLineHandling, NdjsonConfig};
166
167 use crate::engine::NdjsonEngine;
168 use crate::test_util::TestStruct;
169
170 fn collect_output(mut engine: NdjsonEngine<TestStruct>)
171 -> Vec<JsonResult<TestStruct>> {
172 iter::from_fn(|| engine.pop()).collect::<Vec<_>>()
173 }
174
175 #[test]
176 fn no_input() {
177 let engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
178
179 assert_that!(collect_output(engine)).is_empty();
180 }
181
182 #[test]
183 fn incomplete_input() {
184 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
185
186 engine.input("{\"key\":3,\"val");
187
188 assert_that!(collect_output(engine)).is_empty();
189 }
190
191 #[test]
192 fn single_exact_input() {
193 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
194
195 engine.input("{\"key\":3,\"value\":4}\n");
196
197 assert_that!(collect_output(engine))
198 .satisfies_exactly_in_given_order(dyn_assertions!(
199 |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 4 })
200 ));
201 }
202
203 #[test]
204 fn single_item_split_into_two_inputs() {
205 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
206
207 engine.input("{\"key\":42,");
208 engine.input("\"value\":24}\n");
209
210 assert_that!(collect_output(engine))
211 .satisfies_exactly_in_given_order(dyn_assertions!(
212 |it| assert_that!(it).contains_value(TestStruct { key: 42, value: 24 })
213 ));
214 }
215
216 #[test]
217 fn two_items_in_single_input() {
218 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
219
220 engine.input("{\"key\":1,\"value\":1}\n{\"key\":2,\"value\":2}\n");
221
222 assert_that!(collect_output(engine))
223 .satisfies_exactly_in_given_order(dyn_assertions!(
224 |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 1 }),
225 |it| assert_that!(it).contains_value(TestStruct { key: 2, value: 2 })
226 ));
227 }
228
229 #[test]
230 fn two_items_in_many_inputs_with_rest() {
231 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
232
233 engine.input("{\"key\":12,\"v");
234 engine.input("alue\":3");
235 engine.input("4}\n{\"key");
236 engine.input("\":56,\"valu");
237 engine.input("e\":78}\n{\"key\":");
238
239 assert_that!(collect_output(engine))
240 .satisfies_exactly_in_given_order(dyn_assertions!(
241 |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 }),
242 |it| assert_that!(it).contains_value(TestStruct { key: 56, value: 78 })
243 ));
244 }
245
246 #[test]
247 fn input_completing_previous_rest_then_multiple_complete_items_and_more_rest() {
248 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
249
250 engine.input("{\"key\":9,\"value\":");
251 engine.input("8}\n{\"key\":7,\"value\":6}\n{\"key\":5,\"value\":4}\n{\"key\":");
252 engine.input("3,\"value\":2}\n{");
253
254 assert_that!(collect_output(engine))
255 .satisfies_exactly_in_given_order(dyn_assertions!(
256 |it| assert_that!(it).contains_value(TestStruct { key: 9, value: 8 }),
257 |it| assert_that!(it).contains_value(TestStruct { key: 7, value: 6 }),
258 |it| assert_that!(it).contains_value(TestStruct { key: 5, value: 4 }),
259 |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 2 })
260 ));
261 }
262
263 #[test]
264 fn carriage_return_handled_gracefully() {
265 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
266
267 engine.input("{\"key\":1,\"value\":2}\r\n{\"key\":3,\"value\":4}\r\n");
268
269 assert_that!(collect_output(engine))
270 .satisfies_exactly_in_given_order(dyn_assertions!(
271 |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 }),
272 |it| assert_that!(it).contains_value(TestStruct { key: 3, value: 4 })
273 ));
274 }
275
276 #[test]
277 fn whitespace_handled_gracefully() {
278 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
279
280 engine.input("\t{ \"key\":\t13, \"value\": 37 } \r\n");
281
282 assert_that!(collect_output(engine))
283 .satisfies_exactly_in_given_order(dyn_assertions!(
284 |it| assert_that!(it).contains_value(TestStruct { key: 13, value: 37 })
285 ));
286 }
287
288 #[test]
289 fn erroneous_entry_emitted_as_json_error() {
290 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
291
292 engine.input("{\"key\":1}\n{\"key\":1,\"value\":1}\n");
293
294 assert_that!(collect_output(engine))
295 .satisfies_exactly_in_given_order(dyn_assertions!(
296 |it| assert_that!(it).is_err(),
297 |it| assert_that!(it).is_ok()
298 ));
299 }
300
301 #[test]
302 fn error_from_split_entry() {
303 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
304
305 engine.input("{\"key\":100,\"value\":200}\n{\"key\":");
306 engine.input("\"should be a number\",\"value\":0}\n{\"key\":300,\"value\":400}\n");
307
308 assert_that!(collect_output(engine))
309 .satisfies_exactly_in_given_order(dyn_assertions!(
310 |it| assert_that!(it).contains_value(TestStruct { key: 100, value: 200 }),
311 |it| assert_that!(it).is_err(),
312 |it| assert_that!(it).contains_value(TestStruct { key: 300, value: 400 })
313 ));
314 }
315
316 #[test]
317 fn engine_input_works_for_different_types() {
318 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::default();
319
320 engine.input(b"{\"k");
321 engine.input(b"ey\"".to_vec());
322 engine.input(":12".to_string());
323 engine.input(&mut ",\"v".to_string());
324 engine.input("alu".to_string().into_boxed_str());
325 engine.input(b"e\"".to_vec().into_boxed_slice());
326 engine.input(Arc::<str>::from(":3"));
327 engine.input(Rc::<[u8]>::from(&b"4}"[..]));
328 engine.input(Cow::Borrowed(&b"\r\n".to_vec()));
329
330 assert_that!(collect_output(engine))
331 .satisfies_exactly_in_given_order(dyn_assertions!(
332 |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
333 ));
334 }
335
336 #[test]
337 fn old_data_is_discarded() {
338 let mut engine: NdjsonEngine<TestStruct> = NdjsonEngine::new();
339 let count = 20;
340
341 engine.input("{ \"key\": 1, ");
342
343 for _ in 0..(count - 1) {
344 engine.input("\"value\": 2 }\r\n{ \"key\": 1, ");
345 }
346
347 engine.input("\"value\": 2 }\r\n");
348
349 assert_that!(engine.in_queue).is_empty();
350 assert_that!(engine.out_queue).has_length(count);
351 }
352
353 fn configured_engine(configure: impl FnOnce(NdjsonConfig) -> NdjsonConfig)
354 -> NdjsonEngine<TestStruct> {
355 let config = configure(NdjsonConfig::default());
356 NdjsonEngine::with_config(config)
357 }
358
359 fn engine_with_empty_line_handling(empty_line_handling: EmptyLineHandling)
360 -> NdjsonEngine<TestStruct> {
361 configured_engine(|config| config.with_empty_line_handling(empty_line_handling))
362 }
363
364 #[test]
365 fn raises_error_when_parsing_empty_line_in_parse_always_mode() {
366 let mut engine = engine_with_empty_line_handling(EmptyLineHandling::ParseAlways);
367
368 engine.input("{\"key\":1,\"value\":2}\n\n{\"key\":3,\"value\":4}\n");
369
370 assert_that!(collect_output(engine)).contains_elements_matching(Result::is_err);
371 }
372
373 #[test]
374 fn does_not_raise_error_when_parsing_empty_line_in_ignore_empty_mode() {
375 let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
376
377 engine.input("{\"key\":1,\"value\":2}\n\n{\"key\":3,\"value\":4}\n");
378
379 assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
380 }
381
382 #[test]
383 fn does_not_raise_error_when_parsing_empty_line_with_carriage_return_in_ignore_empty_mode() {
384 let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
385
386 engine.input("{\"key\":1,\"value\":2}\r\n\r\n{\"key\":3,\"value\":4}\n");
387
388 assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
389 }
390
391 #[test]
392 fn raises_error_when_parsing_non_empty_blank_line_in_ignore_empty_mode() {
393 let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
394
395 engine.input("{\"key\":1,\"value\":2}\n \t\r\n{\"key\":3,\"value\":4}\n");
396
397 assert_that!(collect_output(engine)).contains_elements_matching(Result::is_err);
398 }
399
400 #[test]
401 fn does_not_raise_error_when_parsing_non_empty_blank_line_in_ignore_blank_mode() {
402 let mut engine = engine_with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
403
404 engine.input("{\"key\":1,\"value\":2}\n \t\r\n{\"key\":3,\"value\":4}\n");
405
406 assert_that!(collect_output(engine)).does_not_contain_elements_matching(Result::is_err);
407 }
408
409 #[test]
410 fn finalize_ignores_rest_if_parse_rest_is_false() {
411 let mut engine = configured_engine(|config| config.with_parse_rest(false));
412
413 engine.input("{\"key\":1,\"value\":2}");
414 engine.finalize();
415
416 assert_that!(collect_output(engine)).is_empty();
417 }
418
419 #[test]
420 fn finalize_parses_valid_rest() {
421 const EMPTY_LINE_HANDLINGS: [EmptyLineHandling; 3] = [
422 EmptyLineHandling::ParseAlways,
423 EmptyLineHandling::IgnoreEmpty,
424 EmptyLineHandling::IgnoreBlank
425 ];
426
427 for empty_line_handling in EMPTY_LINE_HANDLINGS {
428 let mut engine = configured_engine(|config| config
429 .with_empty_line_handling(empty_line_handling)
430 .with_parse_rest(true));
431
432 engine.input("{\"key\":1,\"value\":2}");
433 engine.finalize();
434
435 assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
436 |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
437 ));
438 }
439 }
440
441 #[test]
442 fn finalize_raises_error_on_invalid_rest() {
443 let mut engine = configured_engine(|config| config.with_parse_rest(true));
444
445 engine.input("invalid json");
446 engine.finalize();
447
448 assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
449 |it| assert_that!(it).is_err()
450 ));
451 }
452
453 #[test]
454 fn finalize_ignores_empty_rest_even_if_empty_line_handling_is_parse_always() {
455 let mut engine = configured_engine(|config| config
456 .with_empty_line_handling(EmptyLineHandling::ParseAlways)
457 .with_parse_rest(true));
458
459 engine.finalize();
460
461 assert_that!(collect_output(engine)).is_empty();
462 }
463
464 #[test]
465 fn finalize_ignores_empty_rest_if_empty_line_handling_is_ignore_empty() {
466 let mut engine = configured_engine(|config| config
467 .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty)
468 .with_parse_rest(true));
469
470 engine.finalize();
471
472 assert_that!(collect_output(engine)).is_empty();
473 }
474
475 #[test]
476 fn finalize_does_not_ignore_non_empty_blank_rest_if_empty_line_handling_is_ignore_empty() {
477 let mut engine = configured_engine(|config| config
478 .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty)
479 .with_parse_rest(true));
480
481 engine.input(" ");
482 engine.finalize();
483
484 assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
485 |it| assert_that!(it).is_err()
486 ));
487 }
488
489 #[test]
490 fn finalize_ignores_non_empty_blank_rest_if_empty_line_handling_is_ignore_blank() {
491 let mut engine = configured_engine(|config| config
492 .with_empty_line_handling(EmptyLineHandling::IgnoreBlank)
493 .with_parse_rest(true));
494
495 engine.input(" ");
496 engine.finalize();
497
498 assert_that!(collect_output(engine)).is_empty();
499 }
500
501 #[test]
502 fn finalize_is_idempotent() {
503 let mut engine = configured_engine(|config| config.with_parse_rest(true));
504
505 engine.input("{\"key\":13,\"value\":37}");
506 engine.finalize();
507 engine.finalize();
508
509 assert_that!(collect_output(engine)).satisfies_exactly_in_given_order(dyn_assertions!(
510 |it| assert_that!(it).contains_value(TestStruct { key: 13, value: 37 })
511 ));
512 }
513}