wme_stream/ndjson.rs
1//! NDJSON streaming utilities.
2//!
3//! This module provides utilities for parsing Newline-Delimited JSON (NDJSON)
4//! from streams. Wikimedia Enterprise APIs return NDJSON in:
5//! - Snapshot tar.gz files
6//! - Realtime streaming endpoints
7//! - Realtime Batch tar.gz files
8//!
9//! # NDJSON Format
10//!
11//! Each line is a separate JSON object:
12//! ```ndjson
13//! {"name":"Article 1","identifier":1}
14//! {"name":"Article 2","identifier":2}
15//! ```
16//!
17//! # Example: Parse from File
18//!
19//! ```rust,no_run
20//! use wme_stream::NdjsonStream;
21//! use wme_models::Article;
22//! use futures::StreamExt;
23//! use std::io::BufReader;
24//! use std::fs::File;
25//! use std::pin::pin;
26//!
27//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
28//! let input_file = File::open("articles.ndjson")?;
29//! let reader = BufReader::new(input_file);
30//! let lines = NdjsonStream::from_reader(reader);
31//!
32//! let articles = NdjsonStream::parse_articles(lines);
33//! let mut pinned = pin!(articles);
34//! while let Some(article) = pinned.next().await {
35//! println!("{:?}", article);
36//! }
37//! # Ok(())
38//! # }
39//! ```
40//!
41//! # Example: Parse Generic Type
42//!
43//! ```rust,no_run
44//! use wme_stream::NdjsonStream;
45//! use futures::StreamExt;
46//! use serde::Deserialize;
47//!
48//! #[derive(Deserialize)]
49//! struct MyData { id: u64 }
50//!
51//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
52//! // Assume lines is a Stream<Item = Result<String, io::Error>>
53//! // let lines = NdjsonStream::from_reader(reader);
54//! // let data: MyData = NdjsonStream::parse(lines);
55//! # Ok(())
56//! # }
57//! ```
58
59use std::io::BufRead;
60
61use futures::{Stream, StreamExt};
62use serde::de::DeserializeOwned;
63
64use wme_models::Article;
65
66/// NDJSON streaming utilities.
67///
68/// Provides methods for parsing NDJSON streams into typed structs.
69/// Works with any type implementing `DeserializeOwned`.
70///
71/// # Type Safety
72///
73/// Parsing is type-safe - invalid JSON or incompatible types result in
74/// `StreamError::JsonParse` rather than panics.
75pub struct NdjsonStream;
76
77impl NdjsonStream {
78 /// Parse stream of lines into typed structs.
79 ///
80 /// Each line should contain a valid JSON object matching type `T`.
81 /// Invalid lines result in `StreamError::JsonParse`.
82 ///
83 /// # Type Parameters
84 ///
85 /// * `T` - Target type, must implement `DeserializeOwned`
86 ///
87 /// # Example
88 ///
89 /// ```rust,no_run
90 /// use wme_stream::NdjsonStream;
91 /// use wme_models::Article;
92 /// use futures::StreamExt;
93 /// use std::pin::pin;
94 ///
95 /// # async fn example<S>(lines: S)
96 /// # where
97 /// # S: futures::Stream<Item = Result<String, std::io::Error>>,
98 /// # {
99 /// let articles = NdjsonStream::parse::<Article>(lines);
100 /// let mut pinned = pin!(articles);
101 /// while let Some(result) = pinned.next().await {
102 /// match result {
103 /// Ok(article) => println!("{}", article.name),
104 /// Err(e) => eprintln!("Parse error: {}", e),
105 /// }
106 /// }
107 /// # }
108 /// ```
109 pub fn parse<T>(
110 lines: impl Stream<Item = Result<String, std::io::Error>>,
111 ) -> impl Stream<Item = Result<T, crate::StreamError>>
112 where
113 T: DeserializeOwned,
114 {
115 lines.map(|line| {
116 let line = line.map_err(|e| crate::StreamError::Io(e.to_string()))?;
117 serde_json::from_str::<T>(&line)
118 .map_err(|e| crate::StreamError::JsonParse(e.to_string()))
119 })
120 }
121
122 /// Parse stream specifically for Articles.
123 ///
124 /// Convenience method equivalent to `parse::<Article>(lines)`.
125 ///
126 /// # Example
127 ///
128 /// ```rust,no_run
129 /// use wme_stream::NdjsonStream;
130 /// use futures::StreamExt;
131 /// use std::pin::pin;
132 ///
133 /// # async fn example<S>(lines: S)
134 /// # where
135 /// # S: futures::Stream<Item = Result<String, std::io::Error>>,
136 /// # {
137 /// let articles = NdjsonStream::parse_articles(lines);
138 /// let mut pinned = pin!(articles);
139 /// while let Some(result) = pinned.next().await {
140 /// if let Ok(article) = result {
141 /// println!("{}: {}", article.identifier, article.name);
142 /// }
143 /// }
144 /// # }
145 /// ```
146 pub fn parse_articles(
147 lines: impl Stream<Item = Result<String, std::io::Error>>,
148 ) -> impl Stream<Item = Result<Article, crate::StreamError>> {
149 Self::parse(lines)
150 }
151
152 /// Read lines from a buffered reader.
153 ///
154 /// Creates an async stream from any `BufRead` implementor.
155 /// Useful for reading NDJSON from files or network streams.
156 ///
157 /// # Performance
158 ///
159 /// The reader is read synchronously but yields to the async runtime
160 /// between lines. For large files, this provides good throughput
161 /// without blocking the runtime.
162 ///
163 /// # Example
164 ///
165 /// ```rust,no_run
166 /// use wme_stream::NdjsonStream;
167 /// use std::io::BufReader;
168 /// use std::fs::File;
169 ///
170 /// let file = File::open("data.ndjson").unwrap();
171 /// let reader = BufReader::new(file);
172 /// let lines = NdjsonStream::from_reader(reader);
173 /// ```
174 pub fn from_reader<R: BufRead + Send + 'static>(
175 reader: R,
176 ) -> impl Stream<Item = Result<String, std::io::Error>> {
177 async_stream::try_stream! {
178 for line in reader.lines() {
179 yield line?;
180 }
181 }
182 }
183}
184
185/// Extension trait for NDJSON parsing on streams.
186///
187/// Adds `parse_ndjson()` method to any stream of strings,
188/// allowing fluent parsing without explicit `NdjsonStream` usage.
189///
190/// # Example
191///
192/// ```rust,no_run
193/// use wme_stream::NdjsonExt;
194/// use futures::StreamExt;
195/// use std::pin::pin;
196///
197/// # async fn example<S>(lines: S)
198/// # where
199/// # S: futures::Stream<Item = Result<String, std::io::Error>>,
200/// # {
201/// use wme_models::Article;
202/// let articles = lines.parse_ndjson::<Article>();
203/// let mut pinned = pin!(articles);
204/// while let Some(result) = pinned.next().await {
205/// // Process article
206/// }
207/// # }
208/// ```
209pub trait NdjsonExt {
210 /// Parse lines as NDJSON into typed structs.
211 ///
212 /// See [`NdjsonStream::parse`] for details.
213 fn parse_ndjson<T: DeserializeOwned>(self)
214 -> impl Stream<Item = Result<T, crate::StreamError>>;
215}
216
217impl<S> NdjsonExt for S
218where
219 S: Stream<Item = Result<String, std::io::Error>>,
220{
221 fn parse_ndjson<T: DeserializeOwned>(
222 self,
223 ) -> impl Stream<Item = Result<T, crate::StreamError>> {
224 NdjsonStream::parse(self)
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use futures::stream;
232 use serde::Deserialize;
233 use std::io::Error;
234
235 #[derive(Deserialize, Debug, PartialEq)]
236 struct TestData {
237 id: u64,
238 name: String,
239 }
240
241 #[tokio::test]
242 async fn test_parse_valid_lines() {
243 let lines = vec![
244 Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
245 Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
246 ];
247 let stream = stream::iter(lines);
248
249 let results: Vec<TestData> = NdjsonStream::parse(stream)
250 .map(|r| r.unwrap())
251 .collect()
252 .await;
253
254 assert_eq!(results.len(), 2);
255 assert_eq!(
256 results[0],
257 TestData {
258 id: 1,
259 name: "Alice".to_string()
260 }
261 );
262 assert_eq!(
263 results[1],
264 TestData {
265 id: 2,
266 name: "Bob".to_string()
267 }
268 );
269 }
270
271 #[tokio::test]
272 async fn test_parse_invalid_json() {
273 let lines = vec![
274 Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
275 Ok::<_, Error>("not valid json".to_string()),
276 Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
277 ];
278 let stream = stream::iter(lines);
279
280 let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
281
282 assert_eq!(results.len(), 3);
283 assert!(results[0].is_ok());
284 assert!(results[1].is_err()); // Invalid JSON
285 assert!(results[2].is_ok());
286 }
287
288 #[tokio::test]
289 async fn test_parse_io_error() {
290 let lines = vec![
291 Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
292 Err::<String, _>(Error::other("read failed")),
293 Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
294 ];
295 let stream = stream::iter(lines);
296
297 let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;
298
299 assert_eq!(results.len(), 3);
300 assert!(results[0].is_ok());
301 assert!(results[1].is_err()); // IO error
302 assert!(results[2].is_ok());
303 }
304
305 #[test]
306 fn test_ndjson_ext() {
307 // Just verify the trait exists and compiles
308 let lines: Vec<Result<String, Error>> = vec![];
309 let stream = stream::iter(lines);
310 let _parsed = stream.parse_ndjson::<TestData>();
311 }
312}