ndjson_stream/driver/
stream.rs

1use std::convert::Infallible;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::{ready, Stream};
6use pin_project_lite::pin_project;
7use serde::Deserialize;
8use serde_json::error::Result as JsonResult;
9
10use crate::as_bytes::AsBytes;
11use crate::config::NdjsonConfig;
12use crate::engine::NdjsonEngine;
13use crate::fallible::{FallibleNdjsonError, FallibleNdjsonResult};
14
15pin_project! {
16    struct MapResultInfallible<S> {
17        #[pin]
18        inner: S
19    }
20}
21
22impl<S> MapResultInfallible<S> {
23    fn new(inner: S) -> MapResultInfallible<S> {
24        MapResultInfallible {
25            inner
26        }
27    }
28}
29
30impl<S> Stream for MapResultInfallible<S>
31where
32    S: Stream
33{
34    type Item = Result<S::Item, Infallible>;
35
36    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        let mut this = self.project();
38        let res = ready!(this.inner.as_mut().poll_next(cx));
39        Poll::Ready(res.map(Ok))
40    }
41
42    fn size_hint(&self) -> (usize, Option<usize>) {
43        self.inner.size_hint()
44    }
45}
46
47pin_project! {
48    /// Wraps a [Stream] of data blocks, i.e. types implementing [AsBytes], and offers a [Stream]
49    /// implementation over parsed NDJSON-records according to [Deserialize]. See [from_stream] and
50    /// [from_stream_with_config] for more details.
51    pub struct NdjsonStream<T, S> {
52        #[pin]
53        inner: FallibleNdjsonStream<T, MapResultInfallible<S>>
54    }
55}
56
57impl<T, S> NdjsonStream<T, S> {
58
59    /// Creates a new NDJSON-stream wrapping the given `bytes_stream` with default [NdjsonConfig].
60    pub fn new(bytes_stream: S) -> NdjsonStream<T, S> {
61        let inner_bytes_stream = MapResultInfallible::new(bytes_stream);
62
63        NdjsonStream {
64            inner: FallibleNdjsonStream::new(inner_bytes_stream)
65        }
66    }
67
68    /// Creates a new NDJSON-stream wrapping the given `bytes_stream` with the given [NdjsonConfig]
69    /// to control its behavior. See [NdjsonConfig] for more details.
70    pub fn with_config(bytes_stream: S, config: NdjsonConfig) -> NdjsonStream<T, S> {
71        let inner_bytes_stream = MapResultInfallible::new(bytes_stream);
72
73        NdjsonStream {
74            inner: FallibleNdjsonStream::with_config(inner_bytes_stream, config)
75        }
76    }
77}
78
79impl<T, S> Stream for NdjsonStream<T, S>
80where
81    for<'deserialize> T: Deserialize<'deserialize>,
82    S: Stream,
83    S::Item: AsBytes
84{
85    type Item = JsonResult<T>;
86
87    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<JsonResult<T>>> {
88        let mut this = self.project();
89        let inner_next = ready!(this.inner.as_mut().poll_next(cx));
90        let next = inner_next
91            .map(|fallible_res| fallible_res.map_err(FallibleNdjsonError::unwrap_json_error));
92
93        Poll::Ready(next)
94    }
95}
96
97/// Wraps a [Stream] of data blocks, i.e. types implementing [AsBytes], and offers a [Stream]
98/// implementation over parsed NDJSON-records according to [Deserialize]. The parser is configured
99/// with the default [NdjsonConfig].
100///
101/// # Example
102///
103/// ```
104/// use futures::stream::{self, StreamExt};
105///
106/// let data_blocks = vec![
107///     "123\n",
108///     "456\n789\n"
109/// ];
110///
111/// let mut ndjson_stream = ndjson_stream::from_stream::<u32, _>(stream::iter(data_blocks));
112///
113/// tokio_test::block_on(async {
114///     assert!(matches!(ndjson_stream.next().await, Some(Ok(123))));
115///     assert!(matches!(ndjson_stream.next().await, Some(Ok(456))));
116///     assert!(matches!(ndjson_stream.next().await, Some(Ok(789))));
117///     assert!(ndjson_stream.next().await.is_none());
118/// });
119/// ```
120pub fn from_stream<T, S>(bytes_stream: S) -> NdjsonStream<T, S> {
121    NdjsonStream::new(bytes_stream)
122}
123
124/// Wraps a [Stream] of data blocks, i.e. types implementing [AsBytes], and offers a [Stream]
125/// implementation over parsed NDJSON-records according to [Deserialize]. The parser is configured
126/// with the given [NdjsonConfig].
127///
128/// # Example
129///
130/// ```
131/// use futures::stream::{self, StreamExt};
132/// use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
133///
134/// let data_blocks = vec![
135///     "123\n",
136///     "456\n   \n789\n"
137/// ];
138/// let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
139///
140/// let mut ndjson_stream =
141///     ndjson_stream::from_stream_with_config::<u32, _>(stream::iter(data_blocks), config);
142///
143/// tokio_test::block_on(async {
144///     assert!(matches!(ndjson_stream.next().await, Some(Ok(123))));
145///     assert!(matches!(ndjson_stream.next().await, Some(Ok(456))));
146///     assert!(matches!(ndjson_stream.next().await, Some(Ok(789))));
147///     assert!(ndjson_stream.next().await.is_none());
148/// });
149/// ```
150pub fn from_stream_with_config<T, S>(bytes_stream: S, config: NdjsonConfig) -> NdjsonStream<T, S> {
151    NdjsonStream::with_config(bytes_stream, config)
152}
153
154pin_project! {
155    /// Wraps a [Stream] of [Result]s of data blocks, i.e. types implementing [AsBytes], and offers
156    /// a [Stream] mplementation over parsed NDJSON-records according to [Deserialize], forwarding
157    /// potential errors returned by the wrapped iterator. See [from_fallible_stream] and
158    /// [from_fallible_stream_with_config] for more details.
159    pub struct FallibleNdjsonStream<T, S> {
160        engine: NdjsonEngine<T>,
161        #[pin]
162        bytes_stream: S
163    }
164}
165
166impl<T, S> FallibleNdjsonStream<T, S> {
167
168    /// Creates a new fallible NDJSON-stream wrapping the given `bytes_stream` with default
169    /// [NdjsonConfig].
170    pub fn new(bytes_stream: S) -> FallibleNdjsonStream<T, S> {
171        FallibleNdjsonStream {
172            engine: NdjsonEngine::new(),
173            bytes_stream
174        }
175    }
176
177    /// Creates a new fallible NDJSON-stream wrapping the given `bytes_stream` with the given
178    /// [NdjsonConfig] to control its behavior. See [NdjsonConfig] for more details.
179    pub fn with_config(bytes_stream: S, config: NdjsonConfig) -> FallibleNdjsonStream<T, S> {
180        FallibleNdjsonStream {
181            engine: NdjsonEngine::with_config(config),
182            bytes_stream
183        }
184    }
185}
186
187impl<T, S, B, E> Stream for FallibleNdjsonStream<T, S>
188where
189    for<'deserialize> T: Deserialize<'deserialize>,
190    S: Stream<Item = Result<B, E>>,
191    B: AsBytes
192{
193    type Item = FallibleNdjsonResult<T, E>;
194
195    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196        let mut this = self.project();
197
198        loop {
199            if let Some(result) = this.engine.pop() {
200                return match result {
201                    Ok(value) => Poll::Ready(Some(Ok(value))),
202                    Err(error) => Poll::Ready(Some(Err(FallibleNdjsonError::JsonError(error))))
203                }
204            }
205
206            let bytes = ready!(this.bytes_stream.as_mut().poll_next(cx));
207
208            match bytes {
209                Some(Ok(bytes)) => this.engine.input(bytes),
210                Some(Err(error)) =>
211                    return Poll::Ready(Some(Err(FallibleNdjsonError::InputError(error)))),
212                None => {
213                    this.engine.finalize();
214                    return Poll::Ready(this.engine.pop()
215                        .map(|res| res.map_err(FallibleNdjsonError::JsonError)));
216                }
217            }
218        }
219    }
220}
221
222/// Wraps a [Stream] of [Result]s of data blocks, i.e. types implementing [AsBytes], and offers a
223/// [Stream] implementation over parsed NDJSON-records according to [Deserialize]. Errors in the
224/// wrapped iterator are forwarded via [FallibleNdjsonError::InputError] , while parsing errors are
225/// indicated via [FallibleNdjsonError::JsonError]. The parser is configured with the default
226/// [NdjsonConfig].
227///
228/// # Example
229///
230/// ```
231/// use futures::stream::{self, StreamExt};
232/// use ndjson_stream::fallible::FallibleNdjsonError;
233///
234/// let data_block_results = vec![
235///     Ok("123\n"),
236///     Err("some error"),
237///     Ok("456\n789\n")
238/// ];
239/// let data_stream = stream::iter(data_block_results);
240///
241/// let mut ndjson_stream = ndjson_stream::from_fallible_stream::<u32, _>(data_stream);
242///
243/// tokio_test::block_on(async {
244///     assert!(matches!(ndjson_stream.next().await, Some(Ok(123))));
245///     assert!(matches!(ndjson_stream.next().await,
246///         Some(Err(FallibleNdjsonError::InputError("some error")))));
247///     assert!(matches!(ndjson_stream.next().await, Some(Ok(456))));
248///     assert!(matches!(ndjson_stream.next().await, Some(Ok(789))));
249///     assert!(ndjson_stream.next().await.is_none());
250/// });
251/// ```
252pub fn from_fallible_stream<T, S>(bytes_stream: S) -> FallibleNdjsonStream<T, S> {
253    FallibleNdjsonStream::new(bytes_stream)
254}
255
256/// Wraps a [Stream] of [Result]s of data blocks, i.e. types implementing [AsBytes], and offers a
257/// [Stream] implementation over parsed NDJSON-records according to [Deserialize]. Errors in the
258/// wrapped iterator are forwarded via [FallibleNdjsonError::InputError], while parsing errors are
259/// indicated via [FallibleNdjsonError::JsonError]. The parser is configured with the given
260/// [NdjsonConfig].
261///
262/// # Example
263///
264/// ```
265/// use futures::stream::{self, StreamExt};
266/// use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
267/// use ndjson_stream::fallible::FallibleNdjsonError;
268///
269/// let data_block_results = vec![
270///     Ok("123\n"),
271///     Err("some error"),
272///     Ok("456\n   \n789\n")
273/// ];
274/// let data_stream = stream::iter(data_block_results);
275/// let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
276///
277/// let mut ndjson_stream =
278///     ndjson_stream::from_fallible_stream_with_config::<u32, _>(data_stream, config);
279///
280/// tokio_test::block_on(async {
281///     assert!(matches!(ndjson_stream.next().await, Some(Ok(123))));
282///     assert!(matches!(ndjson_stream.next().await,
283///         Some(Err(FallibleNdjsonError::InputError("some error")))));
284///     assert!(matches!(ndjson_stream.next().await, Some(Ok(456))));
285///     assert!(matches!(ndjson_stream.next().await, Some(Ok(789))));
286///     assert!(ndjson_stream.next().await.is_none());
287/// });
288/// ```
289pub fn from_fallible_stream_with_config<T, S>(bytes_stream: S, config: NdjsonConfig)
290        -> FallibleNdjsonStream<T, S> {
291    FallibleNdjsonStream::with_config(bytes_stream, config)
292}
293
294#[cfg(test)]
295mod tests {
296    use std::pin::pin;
297
298    use futures::{Stream, StreamExt};
299    use futures::stream;
300    use kernal::prelude::*;
301    use tokio_test::assert_pending;
302    use tokio_test::task;
303
304    use crate::as_bytes::AsBytes;
305    use crate::config::EmptyLineHandling;
306    use crate::test_util::{FallibleNdjsonResultAssertions, SingleThenPanicIter, TestStruct};
307
308    use super::*;
309
310    async fn collect<S>(bytes_stream: S) -> Vec<JsonResult<TestStruct>>
311    where
312        S: Stream,
313        S::Item: AsBytes
314    {
315        from_stream(bytes_stream).collect().await
316    }
317
318    trait NextBlocking : Stream {
319        fn next_blocking(&mut self) -> Option<Self::Item>;
320    }
321
322    impl<S: Stream + Unpin> NextBlocking for S {
323        fn next_blocking(&mut self) -> Option<Self::Item> {
324            tokio_test::block_on(self.next())
325        }
326    }
327
328    #[test]
329    fn pending_stream_results_in_pending_item() {
330        let mut ndjson_stream = from_stream::<TestStruct, _>(stream::pending::<&str>());
331
332        let mut next = task::spawn(ndjson_stream.next());
333
334        assert_pending!(next.poll());
335    }
336
337    #[test]
338    fn empty_stream_results_in_empty_results() {
339        let collected = tokio_test::block_on(collect::<_>(stream::empty::<&[u8]>()));
340
341        assert_that!(collected).is_empty();
342    }
343
344    #[test]
345    fn singleton_iter_with_single_json_line() {
346        let stream = stream::once(async { "{\"key\":1,\"value\":2}\n" });
347        let collected = tokio_test::block_on(collect(stream));
348
349        assert_that!(collected).satisfies_exactly_in_given_order(dyn_assertions!(
350            |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
351        ));
352    }
353
354    #[test]
355    fn multiple_iter_items_compose_single_json_line() {
356        let stream = stream::iter(vec!["{\"key\"", ":12,", "\"value\"", ":34}\n"]);
357        let collected = tokio_test::block_on(collect(stream));
358
359        assert_that!(collected).satisfies_exactly_in_given_order(dyn_assertions!(
360            |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
361        ));
362    }
363
364    #[test]
365    fn wrapped_stream_not_queried_while_sufficient_data_remains() {
366        let iter = SingleThenPanicIter {
367            data: Some("{\"key\":0,\"value\":0}\n{\"key\":0,\"value\":0}\n".to_owned())
368        };
369        let mut ndjson_stream = from_stream::<TestStruct, _>(stream::iter(iter));
370
371        assert_that!(ndjson_stream.next_blocking()).is_some();
372        assert_that!(ndjson_stream.next_blocking()).is_some();
373    }
374
375    #[test]
376    fn stream_with_parse_always_config_respects_config() {
377        let stream = stream::once(async { "{\"key\":1,\"value\":2}\n\n" });
378        let config = NdjsonConfig::default()
379            .with_empty_line_handling(EmptyLineHandling::ParseAlways);
380        let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
381
382        assert_that!(ndjson_stream.next_blocking()).to_value().is_ok();
383        assert_that!(ndjson_stream.next_blocking()).to_value().is_err();
384    }
385
386    #[test]
387    fn stream_with_ignore_empty_config_respects_config() {
388        let stream = stream::once(async { "{\"key\":1,\"value\":2}\n\n" });
389        let config = NdjsonConfig::default()
390            .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
391        let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
392
393        assert_that!(ndjson_stream.next_blocking()).to_value().is_ok();
394        assert_that!(ndjson_stream.next_blocking()).is_none();
395    }
396
397    #[test]
398    fn stream_with_parse_rest_handles_valid_finalization() {
399        let stream = stream::once(async { "{\"key\":1,\"value\":2}" });
400        let config = NdjsonConfig::default().with_parse_rest(true);
401        let mut ndjson_stream =  pin!(from_stream_with_config::<TestStruct, _>(stream, config));
402
403        assert_that!(ndjson_stream.next_blocking()).to_value().contains_value(TestStruct { key: 1, value: 2 });
404        assert_that!(ndjson_stream.next_blocking()).is_none();
405    }
406
407    #[test]
408    fn stream_with_parse_rest_handles_invalid_finalization() {
409        let stream = stream::once(async { "{\"key\":1," });
410        let config = NdjsonConfig::default().with_parse_rest(true);
411        let mut ndjson_stream =  pin!(from_stream_with_config::<TestStruct, _>(stream, config));
412
413        assert_that!(ndjson_stream.next_blocking()).to_value().is_err();
414        assert_that!(ndjson_stream.next_blocking()).is_none();
415    }
416
417    #[test]
418    fn stream_without_parse_rest_does_not_handle_finalization() {
419        let stream = stream::once(async { "some text" });
420        let config = NdjsonConfig::default().with_parse_rest(false);
421        let mut ndjson_stream =  pin!(from_stream_with_config::<TestStruct, _>(stream, config));
422
423        assert_that!(ndjson_stream.next_blocking()).is_none();
424    }
425
426    #[test]
427    fn fallible_stream_correctly_forwards_json_error() {
428        let stream = stream::once(async { Ok::<&str, &str>("\n") });
429        let mut fallible_ndjson_stream = pin!(from_fallible_stream::<TestStruct, _>(stream));
430
431        assert_that!(fallible_ndjson_stream.next_blocking()).to_value().is_json_error();
432    }
433
434    #[test]
435    fn fallible_stream_correctly_forwards_input_error() {
436        let stream = stream::once(async { Err::<&str, &str>("test message") });
437        let mut fallible_ndjson_stream = pin!(from_fallible_stream::<TestStruct, _>(stream));
438
439        assert_that!(fallible_ndjson_stream.next_blocking())
440            .to_value()
441            .is_input_error("test message");
442    }
443
444    #[test]
445    fn fallible_stream_operates_correctly_with_interspersed_errors() {
446        let data_vec = vec![
447            Err("test message 1"),
448            Ok("invalid json\n{\"key\":11,\"val"),
449            Ok("ue\":22}\n{\"key\":33,\"value\":44}\ninvalid json\n"),
450            Err("test message 2"),
451            Ok("{\"key\":55,\"value\":66}\n")
452        ];
453        let data_stream = stream::iter(data_vec);
454        let fallible_ndjson_stream = from_fallible_stream::<TestStruct, _>(data_stream);
455
456        assert_that!(tokio_test::block_on(fallible_ndjson_stream.collect::<Vec<_>>()))
457            .satisfies_exactly_in_given_order(dyn_assertions!(
458                |it| assert_that!(it).is_input_error("test message 1"),
459                |it| assert_that!(it).is_json_error(),
460                |it| assert_that!(it).contains_value(TestStruct { key: 11, value: 22 }),
461                |it| assert_that!(it).contains_value(TestStruct { key: 33, value: 44 }),
462                |it| assert_that!(it).is_json_error(),
463                |it| assert_that!(it).is_input_error("test message 2"),
464                |it| assert_that!(it).contains_value(TestStruct { key: 55, value: 66 })
465            ));
466    }
467}