Skip to main content

wme_stream/
ndjson.rs

1//! NDJSON streaming utilities.
2//!
3//! This module provides utilities for parsing Newline-Delimited JSON (NDJSON)
4//! from streams. Wikimedia Enterprise APIs return NDJSON in:
5//! - Snapshot tar.gz files
6//! - Realtime streaming endpoints
7//! - Realtime Batch tar.gz files
8//!
9//! # NDJSON Format
10//!
11//! Each line is a separate JSON object:
12//! ```ndjson
13//! {"name":"Article 1","identifier":1}
14//! {"name":"Article 2","identifier":2}
15//! ```
16//!
17//! # Example: Parse from File
18//!
19//! ```rust,no_run
20//! use wme_stream::NdjsonStream;
21//! use wme_models::Article;
22//! use futures::StreamExt;
23//! use std::io::BufReader;
24//! use std::fs::File;
25//! use std::pin::pin;
26//!
27//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let input_file = File::open("articles.ndjson")?;
29//! let reader = BufReader::new(input_file);
30//! let lines = NdjsonStream::from_reader(reader);
31//!
32//! let articles = NdjsonStream::parse_articles(lines);
33//! let mut pinned = pin!(articles);
34//! while let Some(article) = pinned.next().await {
35//!     println!("{:?}", article);
36//! }
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! # Example: Parse Generic Type
42//!
43//! ```rust,no_run
44//! use wme_stream::NdjsonStream;
45//! use futures::StreamExt;
46//! use serde::Deserialize;
47//!
48//! #[derive(Deserialize)]
49//! struct MyData { id: u64 }
50//!
51//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! // Assume lines is a Stream<Item = Result<String, io::Error>>
53//! // let lines = NdjsonStream::from_reader(reader);
54//! // let data: MyData = NdjsonStream::parse(lines);
55//! # Ok(())
56//! # }
57//! ```
58
59use std::io::BufRead;
60
61use futures::{Stream, StreamExt};
62use serde::de::DeserializeOwned;
63
64use wme_models::Article;
65
66/// NDJSON streaming utilities.
67///
68/// Provides methods for parsing NDJSON streams into typed structs.
69/// Works with any type implementing `DeserializeOwned`.
70///
71/// # Type Safety
72///
73/// Parsing is type-safe - invalid JSON or incompatible types result in
74/// `StreamError::JsonParse` rather than panics.
75pub struct NdjsonStream;
76
77impl NdjsonStream {
78    /// Parse stream of lines into typed structs.
79    ///
80    /// Each line should contain a valid JSON object matching type `T`.
81    /// Invalid lines result in `StreamError::JsonParse`.
82    ///
83    /// # Type Parameters
84    ///
85    /// * `T` - Target type, must implement `DeserializeOwned`
86    ///
87    /// # Example
88    ///
89    /// ```rust,no_run
90    /// use wme_stream::NdjsonStream;
91    /// use wme_models::Article;
92    /// use futures::StreamExt;
93    /// use std::pin::pin;
94    ///
95    /// # async fn example<S>(lines: S)
96    /// # where
97    /// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
98    /// # {
99    /// let articles = NdjsonStream::parse::<Article>(lines);
100    /// let mut pinned = pin!(articles);
101    /// while let Some(result) = pinned.next().await {
102    ///     match result {
103    ///         Ok(article) => println!("{}", article.name),
104    ///         Err(e) => eprintln!("Parse error: {}", e),
105    ///     }
106    /// }
107    /// # }
108    /// ```
109    pub fn parse<T>(
110        lines: impl Stream<Item = Result<String, std::io::Error>>,
111    ) -> impl Stream<Item = Result<T, crate::StreamError>>
112    where
113        T: DeserializeOwned,
114    {
115        lines.map(|line| {
116            let line = line.map_err(|e| crate::StreamError::Io(e.to_string()))?;
117            serde_json::from_str::<T>(&line)
118                .map_err(|e| crate::StreamError::JsonParse(e.to_string()))
119        })
120    }
121
122    /// Parse stream specifically for Articles.
123    ///
124    /// Convenience method equivalent to `parse::<Article>(lines)`.
125    ///
126    /// # Example
127    ///
128    /// ```rust,no_run
129    /// use wme_stream::NdjsonStream;
130    /// use futures::StreamExt;
131    /// use std::pin::pin;
132    ///
133    /// # async fn example<S>(lines: S)
134    /// # where
135    /// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
136    /// # {
137    /// let articles = NdjsonStream::parse_articles(lines);
138    /// let mut pinned = pin!(articles);
139    /// while let Some(result) = pinned.next().await {
140    ///     if let Ok(article) = result {
141    ///         println!("{}: {}", article.identifier, article.name);
142    ///     }
143    /// }
144    /// # }
145    /// ```
146    pub fn parse_articles(
147        lines: impl Stream<Item = Result<String, std::io::Error>>,
148    ) -> impl Stream<Item = Result<Article, crate::StreamError>> {
149        Self::parse(lines)
150    }
151
152    /// Read lines from a buffered reader.
153    ///
154    /// Creates an async stream from any `BufRead` implementor.
155    /// Useful for reading NDJSON from files or network streams.
156    ///
157    /// # Performance
158    ///
159    /// The reader is read synchronously but yields to the async runtime
160    /// between lines. For large files, this provides good throughput
161    /// without blocking the runtime.
162    ///
163    /// # Example
164    ///
165    /// ```rust,no_run
166    /// use wme_stream::NdjsonStream;
167    /// use std::io::BufReader;
168    /// use std::fs::File;
169    ///
170    /// let file = File::open("data.ndjson").unwrap();
171    /// let reader = BufReader::new(file);
172    /// let lines = NdjsonStream::from_reader(reader);
173    /// ```
174    pub fn from_reader<R: BufRead + Send + 'static>(
175        reader: R,
176    ) -> impl Stream<Item = Result<String, std::io::Error>> {
177        async_stream::try_stream! {
178            for line in reader.lines() {
179                yield line?;
180            }
181        }
182    }
183}
184
185/// Extension trait for NDJSON parsing on streams.
186///
187/// Adds `parse_ndjson()` method to any stream of strings,
188/// allowing fluent parsing without explicit `NdjsonStream` usage.
189///
190/// # Example
191///
192/// ```rust,no_run
193/// use wme_stream::NdjsonExt;
194/// use futures::StreamExt;
195/// use std::pin::pin;
196///
197/// # async fn example<S>(lines: S)
198/// # where
199/// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
200/// # {
201/// use wme_models::Article;
202/// let articles = lines.parse_ndjson::<Article>();
203/// let mut pinned = pin!(articles);
204/// while let Some(result) = pinned.next().await {
205///     // Process article
206/// }
207/// # }
208/// ```
209pub trait NdjsonExt {
210    /// Parse lines as NDJSON into typed structs.
211    ///
212    /// See [`NdjsonStream::parse`] for details.
213    fn parse_ndjson<T: DeserializeOwned>(self)
214        -> impl Stream<Item = Result<T, crate::StreamError>>;
215}
216
217impl<S> NdjsonExt for S
218where
219    S: Stream<Item = Result<String, std::io::Error>>,
220{
221    fn parse_ndjson<T: DeserializeOwned>(
222        self,
223    ) -> impl Stream<Item = Result<T, crate::StreamError>> {
224        NdjsonStream::parse(self)
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use futures::stream;
232    use serde::Deserialize;
233    use std::io::Error;
234
235    #[derive(Deserialize, Debug, PartialEq)]
236    struct TestData {
237        id: u64,
238        name: String,
239    }
240
241    #[tokio::test]
242    async fn test_parse_valid_lines() {
243        let lines = vec![
244            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
245            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
246        ];
247        let stream = stream::iter(lines);
248
249        let results: Vec<TestData> = NdjsonStream::parse(stream)
250            .map(|r| r.unwrap())
251            .collect()
252            .await;
253
254        assert_eq!(results.len(), 2);
255        assert_eq!(
256            results[0],
257            TestData {
258                id: 1,
259                name: "Alice".to_string()
260            }
261        );
262        assert_eq!(
263            results[1],
264            TestData {
265                id: 2,
266                name: "Bob".to_string()
267            }
268        );
269    }
270
271    #[tokio::test]
272    async fn test_parse_invalid_json() {
273        let lines = vec![
274            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
275            Ok::<_, Error>("not valid json".to_string()),
276            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
277        ];
278        let stream = stream::iter(lines);
279
280        let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
281
282        assert_eq!(results.len(), 3);
283        assert!(results[0].is_ok());
284        assert!(results[1].is_err()); // Invalid JSON
285        assert!(results[2].is_ok());
286    }
287
288    #[tokio::test]
289    async fn test_parse_io_error() {
290        let lines = vec![
291            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
292            Err::<String, _>(Error::other("read failed")),
293            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
294        ];
295        let stream = stream::iter(lines);
296
297        let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
298
299        assert_eq!(results.len(), 3);
300        assert!(results[0].is_ok());
301        assert!(results[1].is_err()); // IO error
302        assert!(results[2].is_ok());
303    }
304
305    #[test]
306    fn test_ndjson_ext() {
307        // Just verify the trait exists and compiles
308        let lines: Vec<Result<String, Error>> = vec![];
309        let stream = stream::iter(lines);
310        let _parsed = stream.parse_ndjson::<TestData>();
311    }
312}