pub fn from_fallible_stream_with_config<T, S>(
bytes_stream: S,
config: NdjsonConfig,
) -> FallibleNdjsonStream<T, S>
Available on crate feature
stream
only.Expand description
Wraps a Stream of Results of data blocks, i.e. types implementing AsBytes, and offers a Stream implementation over parsed NDJSON-records according to Deserialize. Errors in the wrapped iterator are forwarded via FallibleNdjsonError::InputError, while parsing errors are indicated via FallibleNdjsonError::JsonError. The parser is configured with the given NdjsonConfig.
ยงExample
use futures::stream::{self, StreamExt};
use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
use ndjson_stream::fallible::FallibleNdjsonError;
let data_block_results = vec![
Ok("123\n"),
Err("some error"),
Ok("456\n \n789\n")
];
let data_stream = stream::iter(data_block_results);
let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
let mut ndjson_stream =
ndjson_stream::from_fallible_stream_with_config::<u32, _>(data_stream, config);
tokio_test::block_on(async {
assert!(matches!(ndjson_stream.next().await, Some(Ok(123))));
assert!(matches!(ndjson_stream.next().await,
Some(Err(FallibleNdjsonError::InputError("some error")))));
assert!(matches!(ndjson_stream.next().await, Some(Ok(456))));
assert!(matches!(ndjson_stream.next().await, Some(Ok(789))));
assert!(ndjson_stream.next().await.is_none());
});