1use crate::as_bytes::AsBytes;
2use crate::config::NdjsonConfig;
3use crate::engine::NdjsonEngine;
4use crate::fallible::{FallibleNdjsonError, FallibleNdjsonResult};
5
6use std::convert::Infallible;
7use std::iter::Fuse;
8
9use serde::Deserialize;
10
11use serde_json::error::Result as JsonResult;
12
13struct MapResultInfallible<I> {
14 inner: I
15}
16
17impl<I> MapResultInfallible<I> {
18 fn new(inner: I) -> MapResultInfallible<I> {
19 MapResultInfallible {
20 inner
21 }
22 }
23}
24
25impl<I> Iterator for MapResultInfallible<I>
26where
27 I: Iterator
28{
29 type Item = Result<I::Item, Infallible>;
30
31 fn next(&mut self) -> Option<Result<I::Item, Infallible>> {
32 self.inner.next().map(Ok)
33 }
34
35 fn size_hint(&self) -> (usize, Option<usize>) {
36 self.inner.size_hint()
37 }
38}
39
40pub struct NdjsonIter<T, I> {
44 inner: FallibleNdjsonIter<T, MapResultInfallible<I>>
45}
46
47impl<T, I> NdjsonIter<T, I>
48where
49 I: Iterator
50{
51
52 pub fn new(bytes_iterator: I) -> NdjsonIter<T, I> {
55 let inner_bytes_iterator = MapResultInfallible::new(bytes_iterator);
56
57 NdjsonIter {
58 inner: FallibleNdjsonIter::new(inner_bytes_iterator)
59 }
60 }
61
62 pub fn with_config(bytes_iterator: I, config: NdjsonConfig) -> NdjsonIter<T, I> {
65 let inner_bytes_iterator = MapResultInfallible::new(bytes_iterator);
66
67 NdjsonIter {
68 inner: FallibleNdjsonIter::with_config(inner_bytes_iterator, config)
69 }
70 }
71}
72
73impl<T, I> Iterator for NdjsonIter<T, I>
74where
75 for<'deserialize> T: Deserialize<'deserialize>,
76 I: Iterator,
77 I::Item: AsBytes
78{
79 type Item = JsonResult<T>;
80
81 fn next(&mut self) -> Option<JsonResult<T>> {
82 Some(self.inner.next()?.map_err(FallibleNdjsonError::unwrap_json_error))
83 }
84}
85
86pub fn from_iter<T, I>(into_iter: I) -> NdjsonIter<T, I::IntoIter>
107where
108 I: IntoIterator
109{
110 NdjsonIter::new(into_iter.into_iter())
111}
112
113pub fn from_iter_with_config<T, I>(into_iter: I, config: NdjsonConfig) -> NdjsonIter<T, I::IntoIter>
137where
138 I: IntoIterator
139{
140 NdjsonIter::with_config(into_iter.into_iter(), config)
141}
142
143pub struct FallibleNdjsonIter<T, I> {
148 engine: NdjsonEngine<T>,
149 bytes_iterator: Fuse<I>
150}
151
152impl<T, I> FallibleNdjsonIter<T, I>
153where
154 I: Iterator
155{
156
157 pub fn new(bytes_iterator: I) -> FallibleNdjsonIter<T, I> {
160 FallibleNdjsonIter {
161 engine: NdjsonEngine::new(),
162 bytes_iterator: bytes_iterator.fuse()
163 }
164 }
165
166 pub fn with_config(bytes_iterator: I, config: NdjsonConfig) -> FallibleNdjsonIter<T, I> {
169 FallibleNdjsonIter {
170 engine: NdjsonEngine::with_config(config),
171 bytes_iterator: bytes_iterator.fuse()
172 }
173 }
174}
175
176impl<T, I, B, E> Iterator for FallibleNdjsonIter<T, I>
177where
178 for<'deserialize> T: Deserialize<'deserialize>,
179 I: Iterator<Item = Result<B, E>>,
180 B: AsBytes
181{
182 type Item = FallibleNdjsonResult<T, E>;
183
184 fn next(&mut self) -> Option<FallibleNdjsonResult<T, E>> {
185 loop {
186 if let Some(result) = self.engine.pop() {
187 return match result {
188 Ok(value) => Some(Ok(value)),
189 Err(error) => Some(Err(FallibleNdjsonError::JsonError(error)))
190 }
191 }
192
193 match self.bytes_iterator.next() {
194 Some(Ok(bytes)) => self.engine.input(bytes),
195 Some(Err(error)) => return Some(Err(FallibleNdjsonError::InputError(error))),
196 None => {
197 self.engine.finalize();
198 return self.engine.pop()
199 .map(|res| res.map_err(FallibleNdjsonError::JsonError));
200 }
201 }
202 }
203 }
204}
205
206pub fn from_fallible_iter<T, I>(into_iter: I) -> FallibleNdjsonIter<T, I::IntoIter>
232where
233 I: IntoIterator
234{
235 FallibleNdjsonIter::new(into_iter.into_iter())
236}
237
238pub fn from_fallible_iter_with_config<T, I>(into_iter: I, config: NdjsonConfig)
267 -> FallibleNdjsonIter<T, I::IntoIter>
268where
269 I: IntoIterator
270{
271 FallibleNdjsonIter::with_config(into_iter.into_iter(), config)
272}
273
274#[cfg(test)]
275mod tests {
276
277 use super::*;
278
279 use kernal::prelude::*;
280
281 use std::iter;
282
283 use crate::config::EmptyLineHandling;
284 use crate::test_util::{FallibleNdjsonResultAssertions, SingleThenPanicIter, TestStruct};
285
286 fn collect<I>(into_iter: I) -> Vec<JsonResult<TestStruct>>
287 where
288 I: IntoIterator,
289 I::Item: AsBytes
290 {
291 from_iter(into_iter).collect::<Vec<_>>()
292 }
293
294 #[test]
295 fn empty_iter_results_in_empty_results() {
296 assert_that!(collect::<_>(iter::empty::<&[u8]>())).is_empty();
297 }
298
299 #[test]
300 fn singleton_iter_with_single_json_line() {
301 let iter = iter::once("{\"key\":1,\"value\":2}\n");
302
303 assert_that!(collect(iter)).satisfies_exactly_in_given_order(dyn_assertions!(
304 |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
305 ));
306 }
307
308 #[test]
309 fn multiple_iter_items_compose_single_json_line() {
310 let vec = vec!["{\"key\"", ":12,", "\"value\"", ":34}\n"];
311
312 assert_that!(collect(vec)).satisfies_exactly_in_given_order(dyn_assertions!(
313 |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
314 ));
315 }
316
317 #[test]
318 fn wrapped_iter_not_queried_while_sufficient_data_remains() {
319 let iter = SingleThenPanicIter {
320 data: Some("{\"key\":1,\"value\":2}\n{\"key\":3,\"value\":4}\n".to_owned())
321 };
322 let mut ndjson_iter = NdjsonIter::<TestStruct, _>::new(iter);
323
324 assert_that!(ndjson_iter.next()).is_some();
325 assert_that!(ndjson_iter.next()).is_some();
326 }
327
328 #[test]
329 fn iter_with_parse_always_config_respects_config() {
330 let iter = iter::once("{\"key\":1,\"value\":2}\n\n");
331 let config = NdjsonConfig::default()
332 .with_empty_line_handling(EmptyLineHandling::ParseAlways);
333 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
334
335 assert_that!(ndjson_iter.next()).to_value().is_ok();
336 assert_that!(ndjson_iter.next()).to_value().is_err();
337 }
338
339 #[test]
340 fn iter_with_ignore_empty_config_respects_config() {
341 let iter = iter::once("{\"key\":1,\"value\":2}\n\n");
342 let config = NdjsonConfig::default()
343 .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
344 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
345
346 assert_that!(ndjson_iter.next()).to_value().is_ok();
347 assert_that!(ndjson_iter.next()).is_none();
348 }
349
350 #[test]
351 fn iter_with_parse_rest_handles_valid_finalization() {
352 let iter = iter::once("{\"key\":1,\"value\":2}");
353 let config = NdjsonConfig::default().with_parse_rest(true);
354 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
355
356 assert_that!(ndjson_iter.next()).to_value().contains_value(TestStruct { key: 1, value: 2 });
357 assert_that!(ndjson_iter.next()).is_none();
358 }
359
360 #[test]
361 fn iter_with_parse_rest_handles_invalid_finalization() {
362 let iter = iter::once("{\"key\":1,");
363 let config = NdjsonConfig::default().with_parse_rest(true);
364 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
365
366 assert_that!(ndjson_iter.next()).to_value().is_err();
367 assert_that!(ndjson_iter.next()).is_none();
368 }
369
370 #[test]
371 fn iter_without_parse_rest_does_not_handle_finalization() {
372 let iter = iter::once("some text");
373 let config = NdjsonConfig::default().with_parse_rest(false);
374 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
375
376 assert_that!(ndjson_iter.next()).is_none();
377 }
378
379 #[test]
380 fn iter_fuses_bytes_iter() {
381 #[derive(Default)]
382 struct NoneThenPanicIter {
383 returned_none: bool
384 }
385
386 impl Iterator for NoneThenPanicIter {
387 type Item = Vec<u8>;
388
389 fn next(&mut self) -> Option<Self::Item> {
390 if self.returned_none {
391 panic!("iterator queried twice");
392 }
393
394 self.returned_none = true;
395 None
396 }
397 }
398
399 let iter = NoneThenPanicIter::default();
400 let config = NdjsonConfig::default().with_parse_rest(true);
401 let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
402
403 assert_that!(ndjson_iter.next()).is_none();
404 assert_that!(ndjson_iter.next()).is_none();
405 }
406
407 #[test]
408 fn fallible_iter_correctly_forwards_json_error() {
409 let iter = iter::once::<Result<&str, &str>>(Ok("\n"));
410 let mut fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> = from_fallible_iter(iter);
411
412 assert_that!(fallible_ndjson_iter.next()).to_value().is_json_error();
413 }
414
415 #[test]
416 fn fallible_iter_correctly_forwards_input_error() {
417 let iter = iter::once::<Result<&str, &str>>(Err("test message"));
418 let mut fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> = from_fallible_iter(iter);
419
420 assert_that!(fallible_ndjson_iter.next()).to_value().is_input_error("test message");
421 }
422
423 #[test]
424 fn fallible_iter_operates_correctly_with_interspersed_errors() {
425 let data_vec = vec![
426 Ok("{\"key\":42,\"val"),
427 Err("test message 1"),
428 Ok("ue\":24}\n{\"key\":21,\"value\":12}\ninvalid json\n"),
429 Err("test message 2"),
430 Ok("{\"key\":63,\"value\":36}\n")
431 ];
432 let fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> =
433 from_fallible_iter(data_vec);
434
435 assert_that!(fallible_ndjson_iter.collect::<Vec<_>>())
436 .satisfies_exactly_in_given_order(dyn_assertions!(
437 |it| assert_that!(it).is_input_error("test message 1"),
438 |it| assert_that!(it).contains_value(TestStruct { key: 42, value: 24 }),
439 |it| assert_that!(it).contains_value(TestStruct { key: 21, value: 12 }),
440 |it| assert_that!(it).is_json_error(),
441 |it| assert_that!(it).is_input_error("test message 2"),
442 |it| assert_that!(it).contains_value(TestStruct { key: 63, value: 36 })
443 ));
444 }
445}