ndjson_stream/driver/
iter.rs

1use crate::as_bytes::AsBytes;
2use crate::config::NdjsonConfig;
3use crate::engine::NdjsonEngine;
4use crate::fallible::{FallibleNdjsonError, FallibleNdjsonResult};
5
6use std::convert::Infallible;
7use std::iter::Fuse;
8
9use serde::Deserialize;
10
11use serde_json::error::Result as JsonResult;
12
13struct MapResultInfallible<I> {
14    inner: I
15}
16
17impl<I> MapResultInfallible<I> {
18    fn new(inner: I) -> MapResultInfallible<I> {
19        MapResultInfallible {
20            inner
21        }
22    }
23}
24
25impl<I> Iterator for MapResultInfallible<I>
26where
27    I: Iterator
28{
29    type Item = Result<I::Item, Infallible>;
30
31    fn next(&mut self) -> Option<Result<I::Item, Infallible>> {
32        self.inner.next().map(Ok)
33    }
34
35    fn size_hint(&self) -> (usize, Option<usize>) {
36        self.inner.size_hint()
37    }
38}
39
40/// Wraps an iterator of data blocks, i.e. types implementing [AsBytes], and offers an [Iterator]
41/// implementation over parsed NDJSON-records according to [Deserialize]. See [from_iter] and
42/// [from_iter_with_config] for more details.
43pub struct NdjsonIter<T, I> {
44    inner: FallibleNdjsonIter<T, MapResultInfallible<I>>
45}
46
47impl<T, I> NdjsonIter<T, I>
48where
49    I: Iterator
50{
51
52    /// Creates a new NDJSON-iterator wrapping the given `bytes_iterator` with default
53    /// [NdjsonConfig].
54    pub fn new(bytes_iterator: I) -> NdjsonIter<T, I> {
55        let inner_bytes_iterator = MapResultInfallible::new(bytes_iterator);
56
57        NdjsonIter {
58            inner: FallibleNdjsonIter::new(inner_bytes_iterator)
59        }
60    }
61
62    /// Creates a new NDJSON-iterator wrapping the given `bytes_iterator` with the given
63    /// [NdjsonConfig] to control its behavior. See [NdjsonConfig] for more details.
64    pub fn with_config(bytes_iterator: I, config: NdjsonConfig) -> NdjsonIter<T, I> {
65        let inner_bytes_iterator = MapResultInfallible::new(bytes_iterator);
66
67        NdjsonIter {
68            inner: FallibleNdjsonIter::with_config(inner_bytes_iterator, config)
69        }
70    }
71}
72
73impl<T, I> Iterator for NdjsonIter<T, I>
74where
75    for<'deserialize> T: Deserialize<'deserialize>,
76    I: Iterator,
77    I::Item: AsBytes
78{
79    type Item = JsonResult<T>;
80
81    fn next(&mut self) -> Option<JsonResult<T>> {
82        Some(self.inner.next()?.map_err(FallibleNdjsonError::unwrap_json_error))
83    }
84}
85
86/// Wraps an iterator of data blocks, i.e. types implementing [AsBytes], obtained by
87/// [IntoIterator::into_iter] on `into_iter` and offers an [Iterator] implementation over parsed
88/// NDJSON-records according to [Deserialize]. The parser is configured with the default
89/// [NdjsonConfig].
90///
91/// # Example
92///
93/// ```
94/// let data_blocks = vec![
95///     "123\n",
96///     "456\n789\n"
97/// ];
98///
99/// let mut ndjson_iter = ndjson_stream::from_iter::<u32, _>(data_blocks);
100///
101/// assert!(matches!(ndjson_iter.next(), Some(Ok(123))));
102/// assert!(matches!(ndjson_iter.next(), Some(Ok(456))));
103/// assert!(matches!(ndjson_iter.next(), Some(Ok(789))));
104/// assert!(ndjson_iter.next().is_none());
105/// ```
106pub fn from_iter<T, I>(into_iter: I) -> NdjsonIter<T, I::IntoIter>
107where
108    I: IntoIterator
109{
110    NdjsonIter::new(into_iter.into_iter())
111}
112
113/// Wraps an iterator of data blocks, i.e. types implementing [AsBytes], obtained by
114/// [IntoIterator::into_iter] on `into_iter` and offers an [Iterator] implementation over parsed
115/// NDJSON-records according to [Deserialize]. The parser is configured with the given
116/// [NdjsonConfig].
117///
118/// # Example
119///
120/// ```
121/// use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
122///
123/// let data_blocks = vec![
124///     "123\n",
125///     "456\n   \n789\n"
126/// ];
127/// let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
128///
129/// let mut ndjson_iter = ndjson_stream::from_iter_with_config::<u32, _>(data_blocks, config);
130///
131/// assert!(matches!(ndjson_iter.next(), Some(Ok(123))));
132/// assert!(matches!(ndjson_iter.next(), Some(Ok(456))));
133/// assert!(matches!(ndjson_iter.next(), Some(Ok(789))));
134/// assert!(ndjson_iter.next().is_none());
135/// ```
136pub fn from_iter_with_config<T, I>(into_iter: I, config: NdjsonConfig) -> NdjsonIter<T, I::IntoIter>
137where
138    I: IntoIterator
139{
140    NdjsonIter::with_config(into_iter.into_iter(), config)
141}
142
143/// Wraps an iterator over [Result]s of data blocks, i.e. types implementing [AsBytes], and offers
144/// an [Iterator] implementation over parsed NDJSON-records according to [Deserialize], forwarding
145/// potential errors returned by the wrapped iterator. See [from_fallible_iter] and
146/// [from_fallible_iter_with_config] for more details.
147pub struct FallibleNdjsonIter<T, I> {
148    engine: NdjsonEngine<T>,
149    bytes_iterator: Fuse<I>
150}
151
152impl<T, I> FallibleNdjsonIter<T, I>
153where
154    I: Iterator
155{
156
157    /// Creates a new fallible NDJSON-iterator wrapping the given `bytes_iterator` with default
158    /// [NdjsonConfig].
159    pub fn new(bytes_iterator: I) -> FallibleNdjsonIter<T, I> {
160        FallibleNdjsonIter {
161            engine: NdjsonEngine::new(),
162            bytes_iterator: bytes_iterator.fuse()
163        }
164    }
165
166    /// Creates a new fallible NDJSON-iterator wrapping the given `bytes_iterator` with the given
167    /// [NdjsonConfig] to control its behavior. See [NdjsonConfig] for more details.
168    pub fn with_config(bytes_iterator: I, config: NdjsonConfig) -> FallibleNdjsonIter<T, I> {
169        FallibleNdjsonIter {
170            engine: NdjsonEngine::with_config(config),
171            bytes_iterator: bytes_iterator.fuse()
172        }
173    }
174}
175
176impl<T, I, B, E> Iterator for FallibleNdjsonIter<T, I>
177where
178    for<'deserialize> T: Deserialize<'deserialize>,
179    I: Iterator<Item = Result<B, E>>,
180    B: AsBytes
181{
182    type Item = FallibleNdjsonResult<T, E>;
183
184    fn next(&mut self) -> Option<FallibleNdjsonResult<T, E>> {
185        loop {
186            if let Some(result) = self.engine.pop() {
187                return match result {
188                    Ok(value) => Some(Ok(value)),
189                    Err(error) => Some(Err(FallibleNdjsonError::JsonError(error)))
190                }
191            }
192
193            match self.bytes_iterator.next() {
194                Some(Ok(bytes)) => self.engine.input(bytes),
195                Some(Err(error)) => return Some(Err(FallibleNdjsonError::InputError(error))),
196                None => {
197                    self.engine.finalize();
198                    return self.engine.pop()
199                        .map(|res| res.map_err(FallibleNdjsonError::JsonError));
200                }
201            }
202        }
203    }
204}
205
206/// Wraps an iterator of [Result]s of data blocks, i.e. types implementing [AsBytes], obtained by
207/// [IntoIterator::into_iter] on `into_iter` and offers an [Iterator] implementation over parsed
208/// NDJSON-records according to [Deserialize]. Errors in the wrapped iterator are forwarded via
209/// [FallibleNdjsonError::InputError], while parsing errors are indicated via
210/// [FallibleNdjsonError::JsonError]. The parser is configured with the default [NdjsonConfig].
211///
212/// # Example
213///
214/// ```
215/// use ndjson_stream::fallible::FallibleNdjsonError;
216///
217/// let data_block_results = vec![
218///     Ok("123\n"),
219///     Err("some error"),
220///     Ok("456\n789\n")
221/// ];
222///
223/// let mut ndjson_iter = ndjson_stream::from_fallible_iter::<u32, _>(data_block_results);
224///
225/// assert!(matches!(ndjson_iter.next(), Some(Ok(123))));
226/// assert!(matches!(ndjson_iter.next(), Some(Err(FallibleNdjsonError::InputError("some error")))));
227/// assert!(matches!(ndjson_iter.next(), Some(Ok(456))));
228/// assert!(matches!(ndjson_iter.next(), Some(Ok(789))));
229/// assert!(ndjson_iter.next().is_none());
230/// ```
231pub fn from_fallible_iter<T, I>(into_iter: I) -> FallibleNdjsonIter<T, I::IntoIter>
232where
233    I: IntoIterator
234{
235    FallibleNdjsonIter::new(into_iter.into_iter())
236}
237
238/// Wraps an iterator of [Result]s of data blocks, i.e. types implementing [AsBytes], obtained by
239/// [IntoIterator::into_iter] on `into_iter` and offers an [Iterator] implementation over parsed
240/// NDJSON-records according to [Deserialize]. Errors in the wrapped iterator are forwarded via
241/// [FallibleNdjsonError::InputError], while parsing errors are indicated via
242/// [FallibleNdjsonError::JsonError]. The parser is configured with the given [NdjsonConfig].
243///
244/// # Example
245///
246/// ```
247/// use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
248/// use ndjson_stream::fallible::FallibleNdjsonError;
249///
250/// let data_block_results = vec![
251///     Ok("123\n"),
252///     Err("some error"),
253///     Ok("456\n   \n789\n")
254/// ];
255/// let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
256///
257/// let mut ndjson_iter =
258///     ndjson_stream::from_fallible_iter_with_config::<u32, _>(data_block_results, config);
259///
260/// assert!(matches!(ndjson_iter.next(), Some(Ok(123))));
261/// assert!(matches!(ndjson_iter.next(), Some(Err(FallibleNdjsonError::InputError("some error")))));
262/// assert!(matches!(ndjson_iter.next(), Some(Ok(456))));
263/// assert!(matches!(ndjson_iter.next(), Some(Ok(789))));
264/// assert!(ndjson_iter.next().is_none());
265/// ```
266pub fn from_fallible_iter_with_config<T, I>(into_iter: I, config: NdjsonConfig)
267    -> FallibleNdjsonIter<T, I::IntoIter>
268where
269    I: IntoIterator
270{
271    FallibleNdjsonIter::with_config(into_iter.into_iter(), config)
272}
273
274#[cfg(test)]
275mod tests {
276
277    use super::*;
278
279    use kernal::prelude::*;
280
281    use std::iter;
282
283    use crate::config::EmptyLineHandling;
284    use crate::test_util::{FallibleNdjsonResultAssertions, SingleThenPanicIter, TestStruct};
285
286    fn collect<I>(into_iter: I) -> Vec<JsonResult<TestStruct>>
287    where
288        I: IntoIterator,
289        I::Item: AsBytes
290    {
291        from_iter(into_iter).collect::<Vec<_>>()
292    }
293
294    #[test]
295    fn empty_iter_results_in_empty_results() {
296        assert_that!(collect::<_>(iter::empty::<&[u8]>())).is_empty();
297    }
298
299    #[test]
300    fn singleton_iter_with_single_json_line() {
301        let iter = iter::once("{\"key\":1,\"value\":2}\n");
302
303        assert_that!(collect(iter)).satisfies_exactly_in_given_order(dyn_assertions!(
304            |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
305        ));
306    }
307
308    #[test]
309    fn multiple_iter_items_compose_single_json_line() {
310        let vec = vec!["{\"key\"", ":12,", "\"value\"", ":34}\n"];
311
312        assert_that!(collect(vec)).satisfies_exactly_in_given_order(dyn_assertions!(
313            |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
314        ));
315    }
316
317    #[test]
318    fn wrapped_iter_not_queried_while_sufficient_data_remains() {
319        let iter = SingleThenPanicIter {
320            data: Some("{\"key\":1,\"value\":2}\n{\"key\":3,\"value\":4}\n".to_owned())
321        };
322        let mut ndjson_iter = NdjsonIter::<TestStruct, _>::new(iter);
323
324        assert_that!(ndjson_iter.next()).is_some();
325        assert_that!(ndjson_iter.next()).is_some();
326    }
327
328    #[test]
329    fn iter_with_parse_always_config_respects_config() {
330        let iter = iter::once("{\"key\":1,\"value\":2}\n\n");
331        let config = NdjsonConfig::default()
332            .with_empty_line_handling(EmptyLineHandling::ParseAlways);
333        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
334
335        assert_that!(ndjson_iter.next()).to_value().is_ok();
336        assert_that!(ndjson_iter.next()).to_value().is_err();
337    }
338
339    #[test]
340    fn iter_with_ignore_empty_config_respects_config() {
341        let iter = iter::once("{\"key\":1,\"value\":2}\n\n");
342        let config = NdjsonConfig::default()
343            .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
344        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
345
346        assert_that!(ndjson_iter.next()).to_value().is_ok();
347        assert_that!(ndjson_iter.next()).is_none();
348    }
349
350    #[test]
351    fn iter_with_parse_rest_handles_valid_finalization() {
352        let iter = iter::once("{\"key\":1,\"value\":2}");
353        let config = NdjsonConfig::default().with_parse_rest(true);
354        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
355
356        assert_that!(ndjson_iter.next()).to_value().contains_value(TestStruct { key: 1, value: 2 });
357        assert_that!(ndjson_iter.next()).is_none();
358    }
359
360    #[test]
361    fn iter_with_parse_rest_handles_invalid_finalization() {
362        let iter = iter::once("{\"key\":1,");
363        let config = NdjsonConfig::default().with_parse_rest(true);
364        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
365
366        assert_that!(ndjson_iter.next()).to_value().is_err();
367        assert_that!(ndjson_iter.next()).is_none();
368    }
369
370    #[test]
371    fn iter_without_parse_rest_does_not_handle_finalization() {
372        let iter = iter::once("some text");
373        let config = NdjsonConfig::default().with_parse_rest(false);
374        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
375
376        assert_that!(ndjson_iter.next()).is_none();
377    }
378
379    #[test]
380    fn iter_fuses_bytes_iter() {
381        #[derive(Default)]
382        struct NoneThenPanicIter {
383            returned_none: bool
384        }
385
386        impl Iterator for NoneThenPanicIter {
387            type Item = Vec<u8>;
388
389            fn next(&mut self) -> Option<Self::Item> {
390                if self.returned_none {
391                    panic!("iterator queried twice");
392                }
393
394                self.returned_none = true;
395                None
396            }
397        }
398
399        let iter = NoneThenPanicIter::default();
400        let config = NdjsonConfig::default().with_parse_rest(true);
401        let mut ndjson_iter: NdjsonIter<TestStruct, _> = from_iter_with_config(iter, config);
402
403        assert_that!(ndjson_iter.next()).is_none();
404        assert_that!(ndjson_iter.next()).is_none();
405    }
406
407    #[test]
408    fn fallible_iter_correctly_forwards_json_error() {
409        let iter = iter::once::<Result<&str, &str>>(Ok("\n"));
410        let mut fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> = from_fallible_iter(iter);
411
412        assert_that!(fallible_ndjson_iter.next()).to_value().is_json_error();
413    }
414
415    #[test]
416    fn fallible_iter_correctly_forwards_input_error() {
417        let iter = iter::once::<Result<&str, &str>>(Err("test message"));
418        let mut fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> = from_fallible_iter(iter);
419
420        assert_that!(fallible_ndjson_iter.next()).to_value().is_input_error("test message");
421    }
422
423    #[test]
424    fn fallible_iter_operates_correctly_with_interspersed_errors() {
425        let data_vec = vec![
426            Ok("{\"key\":42,\"val"),
427            Err("test message 1"),
428            Ok("ue\":24}\n{\"key\":21,\"value\":12}\ninvalid json\n"),
429            Err("test message 2"),
430            Ok("{\"key\":63,\"value\":36}\n")
431        ];
432        let fallible_ndjson_iter: FallibleNdjsonIter<TestStruct, _> =
433            from_fallible_iter(data_vec);
434
435        assert_that!(fallible_ndjson_iter.collect::<Vec<_>>())
436            .satisfies_exactly_in_given_order(dyn_assertions!(
437                |it| assert_that!(it).is_input_error("test message 1"),
438                |it| assert_that!(it).contains_value(TestStruct { key: 42, value: 24 }),
439                |it| assert_that!(it).contains_value(TestStruct { key: 21, value: 12 }),
440                |it| assert_that!(it).is_json_error(),
441                |it| assert_that!(it).is_input_error("test message 2"),
442                |it| assert_that!(it).contains_value(TestStruct { key: 63, value: 36 })
443            ));
444    }
445}