ndjson_stream/
lib.rs

1//! `ndjson-stream` offers a variety of NDJSON-parsers which accept data in chunks and process these
2//! chunks before reading further, thus enabling a streaming-style use. The crate offers a low-level
3//! interface in the [engine] module and more high-level interfaces for synchronous and asynchronous
4//! NDJSON processing, which are available at the crate root (see for example [from_iter]). The
5//! parser accepts any input which implements the [AsBytes](as_bytes::AsBytes) trait, which are the
6//! most common data containers in core Rust and the standard library (e.g. `Vec<u8>` or `&str`).
7//!
8//! `ndjson-stream` uses the [serde_json] crate to parse individual lines. Hence, the output type of
9//! the parser must implement [Deserialize](serde::Deserialize).
10//!
11//! # High-level example
12//!
13//! As an example, we will look at the iterator interface. The most basic form can be instantiated
14//! with [from_iter]. We have to provide an iterator over data blocks, implementing
15//! [AsBytes](as_bytes::AsBytes), and obtain an iterator over parsed NDJSON-records. Actually, the
16//! exact return type is a `Result` which may contain a JSON-error in case a line is not valid JSON
17//! or does not match the schema of the output type.
18//!
19//! The example below demonstrates both the happy-path and parsing errors.
20//!
21//! ```
22//! use serde::Deserialize;
23//!
24//! #[derive(Debug, Deserialize, Eq, PartialEq)]
25//! struct Person {
26//!     name: String,
27//!     age: u16
28//! }
29//!
30//! let data_blocks = vec![
31//!     "{\"name\":\"Alice\",\"age\":25}\n",
32//!     "{\"this\":\"is\",\"not\":\"valid\"}\n",
33//!     "{\"name\":\"Bob\",",
34//!     "\"age\":35}\r\n"
35//! ];
36//! let mut ndjson_iter = ndjson_stream::from_iter::<Person, _>(data_blocks);
37//!
38//! assert_eq!(ndjson_iter.next().unwrap().unwrap(), Person { name: "Alice".into(), age: 25 });
39//! assert!(ndjson_iter.next().unwrap().is_err());
40//! assert_eq!(ndjson_iter.next().unwrap().unwrap(), Person { name: "Bob".into(), age: 35 });
41//! assert!(ndjson_iter.next().is_none());
42//! ```
43//!
44//! # Configuration
45//!
46//! There are several configuration options available to control how the parser behaves in certain
47//! situations. See [NdjsonConfig](config::NdjsonConfig) for more details. To specify the config
48//! used for a parser, use the appropriate `_with_config`-suffixed function.
49//!
50//! In the example below, we use [from_iter_with_config] to construct an NDJSON-iterator which
51//! ignores blank lines. That is, it does not produce an output record for any line which consists
52//! only of whitespace rather than attempting to parse it and raising a JSON-error.
53//!
54//! ```
55//! use ndjson_stream::config::{EmptyLineHandling, NdjsonConfig};
56//! use serde::Deserialize;
57//!
58//! #[derive(Debug, Deserialize, Eq, PartialEq)]
59//! struct Person {
60//!     name: String,
61//!     age: u16
62//! }
63//!
64//! let data_blocks = vec![
65//!     "{\"name\":\"Charlie\",\"age\":32}\n",
66//!     "   \n",
67//!     "{\"name\":\"Dolores\",\"age\":41}\n"
68//! ];
69//! let config = NdjsonConfig::default().with_empty_line_handling(EmptyLineHandling::IgnoreBlank);
70//! let mut ndjson_iter = ndjson_stream::from_iter_with_config::<Person, _>(data_blocks, config);
71//!
72//! assert_eq!(ndjson_iter.next().unwrap().unwrap(), Person { name: "Charlie".into(), age: 32 });
73//! assert_eq!(ndjson_iter.next().unwrap().unwrap(), Person { name: "Dolores".into(), age: 41 });
74//! assert!(ndjson_iter.next().is_none());
75//! ```
76//!
77//! # Fallibility
78//!
79//! In addition to the ordinary interfaces, there is a fallible counterpart for each one. "Fallible"
80//! in this context refers to the input data source - in the examples above the iterator of
81//! `data_blocks`.
82//!
83//! Fallible parsers accept as input a data source which returns [Result]s with some error type and
84//! forward potential read errors to the user. See
85//! [FallibleNdjsonError](fallible::FallibleNdjsonError) for more details on how the error is
86//! communicated.
87//!
88//! In the example below, we use a fallible iterator.
89//!
90//! ```
91//! use ndjson_stream::fallible::FallibleNdjsonError;
92//! use serde::Deserialize;
93//!
94//! #[derive(Debug, Deserialize, Eq, PartialEq)]
95//! struct Person {
96//!     name: String,
97//!     age: u16
98//! }
99//!
100//! let data_blocks = vec![
101//!     Ok("{\"name\":\"Eve\",\"age\":22}\n"),
102//!     Err("error"),
103//!     Ok("{\"invalid\":json}\n")
104//! ];
105//! let mut ndjson_iter = ndjson_stream::from_fallible_iter::<Person, _>(data_blocks);
106//!
107//! assert_eq!(ndjson_iter.next().unwrap().unwrap(), Person { name: "Eve".into(), age: 22 });
108//! assert!(matches!(ndjson_iter.next(), Some(Err(FallibleNdjsonError::InputError("error")))));
109//! assert!(matches!(ndjson_iter.next(), Some(Err(FallibleNdjsonError::JsonError(_)))));
110//! assert!(ndjson_iter.next().is_none());
111//! ```
112//!
113//! # Crate features
114//!
115//! * `bytes`: Offers an implementation of [AsBytes](as_bytes::AsBytes) on [Bytes](bytes::Bytes) and
116//! [BytesMut](bytes::BytesMut) from the [bytes] crate.
117//! * `iter` (default): Enables the [Iterator]-style interface ([from_iter] family).
118//! * `stream`: Enables the [Stream](futures::Stream)-style interface from the `futures` crate
119//! ([from_stream] family).
120
121#![warn(missing_docs)]
122
123#![cfg_attr(doc_cfg, feature(doc_cfg))]
124
125pub mod as_bytes;
126pub mod config;
127pub mod driver;
128pub mod engine;
129pub mod fallible;
130
131#[cfg(feature = "iter")]
132#[cfg_attr(doc_cfg, doc(cfg(feature = "iter")))]
133pub use crate::driver::iter::from_iter;
134
135#[cfg(feature = "iter")]
136#[cfg_attr(doc_cfg, doc(cfg(feature = "iter")))]
137pub use crate::driver::iter::from_iter_with_config;
138
139#[cfg(feature = "iter")]
140#[cfg_attr(doc_cfg, doc(cfg(feature = "iter")))]
141pub use crate::driver::iter::from_fallible_iter;
142
143#[cfg(feature = "iter")]
144#[cfg_attr(doc_cfg, doc(cfg(feature = "iter")))]
145pub use crate::driver::iter::from_fallible_iter_with_config;
146
147#[cfg(feature = "stream")]
148#[cfg_attr(doc_cfg, doc(cfg(feature = "stream")))]
149pub use crate::driver::stream::from_stream;
150
151#[cfg(feature = "stream")]
152#[cfg_attr(doc_cfg, doc(cfg(feature = "stream")))]
153pub use crate::driver::stream::from_stream_with_config;
154
155#[cfg(feature = "stream")]
156#[cfg_attr(doc_cfg, doc(cfg(feature = "stream")))]
157pub use crate::driver::stream::from_fallible_stream;
158
159#[cfg(feature = "stream")]
160#[cfg_attr(doc_cfg, doc(cfg(feature = "stream")))]
161pub use crate::driver::stream::from_fallible_stream_with_config;
162
163#[cfg(test)]
164pub(crate) mod test_util {
165    use std::borrow::Borrow;
166    use std::fmt::Debug;
167
168    use kernal::{AssertThat, AssertThatData, Failure};
169
170    use serde::Deserialize;
171    use crate::fallible::{FallibleNdjsonError, FallibleNdjsonResult};
172
173    #[derive(Debug, Deserialize, Eq, PartialEq)]
174    pub(crate) struct TestStruct {
175        pub(crate) key: u64,
176        pub(crate) value: u64
177    }
178
179    pub(crate) struct SingleThenPanicIter {
180        pub(crate) data: Option<String>
181    }
182
183    impl Iterator for SingleThenPanicIter {
184        type Item = String;
185
186        fn next(&mut self) -> Option<String> {
187            Some(self.data.take().expect("iterator queried twice"))
188        }
189    }
190
191    pub(crate) trait FallibleNdjsonResultAssertions<V, E> {
192
193        fn is_json_error(self) -> Self;
194
195        fn is_input_error(self, expected: impl Borrow<E>) -> Self;
196    }
197
198    impl<V, E, R> FallibleNdjsonResultAssertions<V, E> for AssertThat<R>
199    where
200        E: Debug + PartialEq,
201        R: Borrow<FallibleNdjsonResult<V, E>>
202    {
203        fn is_json_error(self) -> Self {
204            let failure_start = Failure::new(&self).expected_it("to contain a JSON-error");
205
206            match self.data().borrow() {
207                Err(FallibleNdjsonError::JsonError(_)) => self,
208                Err(FallibleNdjsonError::InputError(_)) =>
209                    failure_start.but_it("was an input error").fail(),
210                Ok(_) => failure_start.but_it("was Ok").fail()
211            }
212        }
213
214        fn is_input_error(self, expected: impl Borrow<E>) -> Self {
215            let expected = expected.borrow();
216            let failure_start = Failure::new(&self)
217                .expected_it(format!("to contain the input error <{:?}>", expected));
218
219            match self.data().borrow() {
220                Err(FallibleNdjsonError::InputError(actual)) if actual == expected => self,
221                Err(FallibleNdjsonError::InputError(actual)) =>
222                    failure_start
223                        .but_it(format!("contained the input error <{:?}>", actual))
224                        .fail(),
225                Err(FallibleNdjsonError::JsonError(_)) =>
226                    failure_start.but_it("was a JSON-error").fail(),
227                Ok(_) => failure_start.but_it("was Ok").fail(),
228            }
229        }
230    }
231}