1use std::convert::Infallible;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::{ready, Stream};
6use pin_project_lite::pin_project;
7use serde::Deserialize;
8use serde_json::error::Result as JsonResult;
9
10use crate::as_bytes::AsBytes;
11use crate::config::NdjsonConfig;
12use crate::engine::NdjsonEngine;
13use crate::fallible::{FallibleNdjsonError, FallibleNdjsonResult};
14
15pin_project! {
16 struct MapResultInfallible<S> {
17 #[pin]
18 inner: S
19 }
20}
21
22impl<S> MapResultInfallible<S> {
23 fn new(inner: S) -> MapResultInfallible<S> {
24 MapResultInfallible {
25 inner
26 }
27 }
28}
29
30impl<S> Stream for MapResultInfallible<S>
31where
32 S: Stream
33{
34 type Item = Result<S::Item, Infallible>;
35
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 let mut this = self.project();
38 let res = ready!(this.inner.as_mut().poll_next(cx));
39 Poll::Ready(res.map(Ok))
40 }
41
42 fn size_hint(&self) -> (usize, Option<usize>) {
43 self.inner.size_hint()
44 }
45}
46
47pin_project! {
48 pub struct NdjsonStream<T, S> {
52 #[pin]
53 inner: FallibleNdjsonStream<T, MapResultInfallible<S>>
54 }
55}
56
57impl<T, S> NdjsonStream<T, S> {
58
59 pub fn new(bytes_stream: S) -> NdjsonStream<T, S> {
61 let inner_bytes_stream = MapResultInfallible::new(bytes_stream);
62
63 NdjsonStream {
64 inner: FallibleNdjsonStream::new(inner_bytes_stream)
65 }
66 }
67
68 pub fn with_config(bytes_stream: S, config: NdjsonConfig) -> NdjsonStream<T, S> {
71 let inner_bytes_stream = MapResultInfallible::new(bytes_stream);
72
73 NdjsonStream {
74 inner: FallibleNdjsonStream::with_config(inner_bytes_stream, config)
75 }
76 }
77}
78
79impl<T, S> Stream for NdjsonStream<T, S>
80where
81 for<'deserialize> T: Deserialize<'deserialize>,
82 S: Stream,
83 S::Item: AsBytes
84{
85 type Item = JsonResult<T>;
86
87 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<JsonResult<T>>> {
88 let mut this = self.project();
89 let inner_next = ready!(this.inner.as_mut().poll_next(cx));
90 let next = inner_next
91 .map(|fallible_res| fallible_res.map_err(FallibleNdjsonError::unwrap_json_error));
92
93 Poll::Ready(next)
94 }
95}
96
97pub fn from_stream<T, S>(bytes_stream: S) -> NdjsonStream<T, S> {
121 NdjsonStream::new(bytes_stream)
122}
123
124pub fn from_stream_with_config<T, S>(bytes_stream: S, config: NdjsonConfig) -> NdjsonStream<T, S> {
151 NdjsonStream::with_config(bytes_stream, config)
152}
153
154pin_project! {
155 pub struct FallibleNdjsonStream<T, S> {
160 engine: NdjsonEngine<T>,
161 #[pin]
162 bytes_stream: S
163 }
164}
165
166impl<T, S> FallibleNdjsonStream<T, S> {
167
168 pub fn new(bytes_stream: S) -> FallibleNdjsonStream<T, S> {
171 FallibleNdjsonStream {
172 engine: NdjsonEngine::new(),
173 bytes_stream
174 }
175 }
176
177 pub fn with_config(bytes_stream: S, config: NdjsonConfig) -> FallibleNdjsonStream<T, S> {
180 FallibleNdjsonStream {
181 engine: NdjsonEngine::with_config(config),
182 bytes_stream
183 }
184 }
185}
186
187impl<T, S, B, E> Stream for FallibleNdjsonStream<T, S>
188where
189 for<'deserialize> T: Deserialize<'deserialize>,
190 S: Stream<Item = Result<B, E>>,
191 B: AsBytes
192{
193 type Item = FallibleNdjsonResult<T, E>;
194
195 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196 let mut this = self.project();
197
198 loop {
199 if let Some(result) = this.engine.pop() {
200 return match result {
201 Ok(value) => Poll::Ready(Some(Ok(value))),
202 Err(error) => Poll::Ready(Some(Err(FallibleNdjsonError::JsonError(error))))
203 }
204 }
205
206 let bytes = ready!(this.bytes_stream.as_mut().poll_next(cx));
207
208 match bytes {
209 Some(Ok(bytes)) => this.engine.input(bytes),
210 Some(Err(error)) =>
211 return Poll::Ready(Some(Err(FallibleNdjsonError::InputError(error)))),
212 None => {
213 this.engine.finalize();
214 return Poll::Ready(this.engine.pop()
215 .map(|res| res.map_err(FallibleNdjsonError::JsonError)));
216 }
217 }
218 }
219 }
220}
221
222pub fn from_fallible_stream<T, S>(bytes_stream: S) -> FallibleNdjsonStream<T, S> {
253 FallibleNdjsonStream::new(bytes_stream)
254}
255
256pub fn from_fallible_stream_with_config<T, S>(bytes_stream: S, config: NdjsonConfig)
290 -> FallibleNdjsonStream<T, S> {
291 FallibleNdjsonStream::with_config(bytes_stream, config)
292}
293
294#[cfg(test)]
295mod tests {
296 use std::pin::pin;
297
298 use futures::{Stream, StreamExt};
299 use futures::stream;
300 use kernal::prelude::*;
301 use tokio_test::assert_pending;
302 use tokio_test::task;
303
304 use crate::as_bytes::AsBytes;
305 use crate::config::EmptyLineHandling;
306 use crate::test_util::{FallibleNdjsonResultAssertions, SingleThenPanicIter, TestStruct};
307
308 use super::*;
309
310 async fn collect<S>(bytes_stream: S) -> Vec<JsonResult<TestStruct>>
311 where
312 S: Stream,
313 S::Item: AsBytes
314 {
315 from_stream(bytes_stream).collect().await
316 }
317
318 trait NextBlocking : Stream {
319 fn next_blocking(&mut self) -> Option<Self::Item>;
320 }
321
322 impl<S: Stream + Unpin> NextBlocking for S {
323 fn next_blocking(&mut self) -> Option<Self::Item> {
324 tokio_test::block_on(self.next())
325 }
326 }
327
328 #[test]
329 fn pending_stream_results_in_pending_item() {
330 let mut ndjson_stream = from_stream::<TestStruct, _>(stream::pending::<&str>());
331
332 let mut next = task::spawn(ndjson_stream.next());
333
334 assert_pending!(next.poll());
335 }
336
337 #[test]
338 fn empty_stream_results_in_empty_results() {
339 let collected = tokio_test::block_on(collect::<_>(stream::empty::<&[u8]>()));
340
341 assert_that!(collected).is_empty();
342 }
343
344 #[test]
345 fn singleton_iter_with_single_json_line() {
346 let stream = stream::once(async { "{\"key\":1,\"value\":2}\n" });
347 let collected = tokio_test::block_on(collect(stream));
348
349 assert_that!(collected).satisfies_exactly_in_given_order(dyn_assertions!(
350 |it| assert_that!(it).contains_value(TestStruct { key: 1, value: 2 })
351 ));
352 }
353
354 #[test]
355 fn multiple_iter_items_compose_single_json_line() {
356 let stream = stream::iter(vec!["{\"key\"", ":12,", "\"value\"", ":34}\n"]);
357 let collected = tokio_test::block_on(collect(stream));
358
359 assert_that!(collected).satisfies_exactly_in_given_order(dyn_assertions!(
360 |it| assert_that!(it).contains_value(TestStruct { key: 12, value: 34 })
361 ));
362 }
363
364 #[test]
365 fn wrapped_stream_not_queried_while_sufficient_data_remains() {
366 let iter = SingleThenPanicIter {
367 data: Some("{\"key\":0,\"value\":0}\n{\"key\":0,\"value\":0}\n".to_owned())
368 };
369 let mut ndjson_stream = from_stream::<TestStruct, _>(stream::iter(iter));
370
371 assert_that!(ndjson_stream.next_blocking()).is_some();
372 assert_that!(ndjson_stream.next_blocking()).is_some();
373 }
374
375 #[test]
376 fn stream_with_parse_always_config_respects_config() {
377 let stream = stream::once(async { "{\"key\":1,\"value\":2}\n\n" });
378 let config = NdjsonConfig::default()
379 .with_empty_line_handling(EmptyLineHandling::ParseAlways);
380 let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
381
382 assert_that!(ndjson_stream.next_blocking()).to_value().is_ok();
383 assert_that!(ndjson_stream.next_blocking()).to_value().is_err();
384 }
385
386 #[test]
387 fn stream_with_ignore_empty_config_respects_config() {
388 let stream = stream::once(async { "{\"key\":1,\"value\":2}\n\n" });
389 let config = NdjsonConfig::default()
390 .with_empty_line_handling(EmptyLineHandling::IgnoreEmpty);
391 let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
392
393 assert_that!(ndjson_stream.next_blocking()).to_value().is_ok();
394 assert_that!(ndjson_stream.next_blocking()).is_none();
395 }
396
397 #[test]
398 fn stream_with_parse_rest_handles_valid_finalization() {
399 let stream = stream::once(async { "{\"key\":1,\"value\":2}" });
400 let config = NdjsonConfig::default().with_parse_rest(true);
401 let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
402
403 assert_that!(ndjson_stream.next_blocking()).to_value().contains_value(TestStruct { key: 1, value: 2 });
404 assert_that!(ndjson_stream.next_blocking()).is_none();
405 }
406
407 #[test]
408 fn stream_with_parse_rest_handles_invalid_finalization() {
409 let stream = stream::once(async { "{\"key\":1," });
410 let config = NdjsonConfig::default().with_parse_rest(true);
411 let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
412
413 assert_that!(ndjson_stream.next_blocking()).to_value().is_err();
414 assert_that!(ndjson_stream.next_blocking()).is_none();
415 }
416
417 #[test]
418 fn stream_without_parse_rest_does_not_handle_finalization() {
419 let stream = stream::once(async { "some text" });
420 let config = NdjsonConfig::default().with_parse_rest(false);
421 let mut ndjson_stream = pin!(from_stream_with_config::<TestStruct, _>(stream, config));
422
423 assert_that!(ndjson_stream.next_blocking()).is_none();
424 }
425
426 #[test]
427 fn fallible_stream_correctly_forwards_json_error() {
428 let stream = stream::once(async { Ok::<&str, &str>("\n") });
429 let mut fallible_ndjson_stream = pin!(from_fallible_stream::<TestStruct, _>(stream));
430
431 assert_that!(fallible_ndjson_stream.next_blocking()).to_value().is_json_error();
432 }
433
434 #[test]
435 fn fallible_stream_correctly_forwards_input_error() {
436 let stream = stream::once(async { Err::<&str, &str>("test message") });
437 let mut fallible_ndjson_stream = pin!(from_fallible_stream::<TestStruct, _>(stream));
438
439 assert_that!(fallible_ndjson_stream.next_blocking())
440 .to_value()
441 .is_input_error("test message");
442 }
443
444 #[test]
445 fn fallible_stream_operates_correctly_with_interspersed_errors() {
446 let data_vec = vec![
447 Err("test message 1"),
448 Ok("invalid json\n{\"key\":11,\"val"),
449 Ok("ue\":22}\n{\"key\":33,\"value\":44}\ninvalid json\n"),
450 Err("test message 2"),
451 Ok("{\"key\":55,\"value\":66}\n")
452 ];
453 let data_stream = stream::iter(data_vec);
454 let fallible_ndjson_stream = from_fallible_stream::<TestStruct, _>(data_stream);
455
456 assert_that!(tokio_test::block_on(fallible_ndjson_stream.collect::<Vec<_>>()))
457 .satisfies_exactly_in_given_order(dyn_assertions!(
458 |it| assert_that!(it).is_input_error("test message 1"),
459 |it| assert_that!(it).is_json_error(),
460 |it| assert_that!(it).contains_value(TestStruct { key: 11, value: 22 }),
461 |it| assert_that!(it).contains_value(TestStruct { key: 33, value: 44 }),
462 |it| assert_that!(it).is_json_error(),
463 |it| assert_that!(it).is_input_error("test message 2"),
464 |it| assert_that!(it).contains_value(TestStruct { key: 55, value: 66 })
465 ));
466 }
467}