wme-stream 0.1.1

Streaming utilities for the Wikimedia Enterprise API
Documentation
//! NDJSON streaming utilities.
//!
//! This module provides utilities for parsing Newline-Delimited JSON (NDJSON)
//! from streams. Wikimedia Enterprise APIs return NDJSON in:
//! - Snapshot tar.gz files
//! - Realtime streaming endpoints
//! - Realtime Batch tar.gz files
//!
//! # NDJSON Format
//!
//! Each line is a separate JSON object:
//! ```ndjson
//! {"name":"Article 1","identifier":1}
//! {"name":"Article 2","identifier":2}
//! ```
//!
//! # Example: Parse from File
//!
//! ```rust,no_run
//! use wme_stream::NdjsonStream;
//! use wme_models::Article;
//! use futures::StreamExt;
//! use std::io::BufReader;
//! use std::fs::File;
//! use std::pin::pin;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! let input_file = File::open("articles.ndjson")?;
//! let reader = BufReader::new(input_file);
//! let lines = NdjsonStream::from_reader(reader);
//!
//! let articles = NdjsonStream::parse_articles(lines);
//! let mut pinned = pin!(articles);
//! while let Some(article) = pinned.next().await {
//!     println!("{:?}", article);
//! }
//! # Ok(())
//! # }
//! ```
//!
//! # Example: Parse Generic Type
//!
//! ```rust,no_run
//! use wme_stream::NdjsonStream;
//! use futures::StreamExt;
//! use serde::Deserialize;
//!
//! #[derive(Deserialize)]
//! struct MyData { id: u64 }
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! // Assume lines is a Stream<Item = Result<String, io::Error>>
//! // let lines = NdjsonStream::from_reader(reader);
//! // let data: MyData = NdjsonStream::parse(lines);
//! # Ok(())
//! # }
//! ```

use std::io::BufRead;

use futures::{Stream, StreamExt};
use serde::de::DeserializeOwned;

use wme_models::Article;

/// NDJSON streaming utilities.
///
/// Provides methods for parsing NDJSON streams into typed structs.
/// Works with any type implementing `DeserializeOwned`.
///
/// # Type Safety
///
/// Parsing is type-safe - invalid JSON or incompatible types result in
/// `StreamError::JsonParse` rather than panics.
pub struct NdjsonStream;

impl NdjsonStream {
    /// Parse stream of lines into typed structs.
    ///
    /// Each line should contain a valid JSON object matching type `T`.
    /// Invalid lines result in `StreamError::JsonParse`.
    ///
    /// # Type Parameters
    ///
    /// * `T` - Target type, must implement `DeserializeOwned`
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use wme_stream::NdjsonStream;
    /// use wme_models::Article;
    /// use futures::StreamExt;
    /// use std::pin::pin;
    ///
    /// # async fn example<S>(lines: S)
    /// # where
    /// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
    /// # {
    /// let articles = NdjsonStream::parse::<Article>(lines);
    /// let mut pinned = pin!(articles);
    /// while let Some(result) = pinned.next().await {
    ///     match result {
    ///         Ok(article) => println!("{}", article.name),
    ///         Err(e) => eprintln!("Parse error: {}", e),
    ///     }
    /// }
    /// # }
    /// ```
    pub fn parse<T>(
        lines: impl Stream<Item = Result<String, std::io::Error>>,
    ) -> impl Stream<Item = Result<T, crate::StreamError>>
    where
        T: DeserializeOwned,
    {
        lines.map(|line| {
            let line = line.map_err(|e| crate::StreamError::Io(e.to_string()))?;
            serde_json::from_str::<T>(&line)
                .map_err(|e| crate::StreamError::JsonParse(e.to_string()))
        })
    }

    /// Parse stream specifically for Articles.
    ///
    /// Convenience method equivalent to `parse::<Article>(lines)`.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use wme_stream::NdjsonStream;
    /// use futures::StreamExt;
    /// use std::pin::pin;
    ///
    /// # async fn example<S>(lines: S)
    /// # where
    /// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
    /// # {
    /// let articles = NdjsonStream::parse_articles(lines);
    /// let mut pinned = pin!(articles);
    /// while let Some(result) = pinned.next().await {
    ///     if let Ok(article) = result {
    ///         println!("{}: {}", article.identifier, article.name);
    ///     }
    /// }
    /// # }
    /// ```
    pub fn parse_articles(
        lines: impl Stream<Item = Result<String, std::io::Error>>,
    ) -> impl Stream<Item = Result<Article, crate::StreamError>> {
        Self::parse(lines)
    }

    /// Read lines from a buffered reader.
    ///
    /// Creates an async stream from any `BufRead` implementor.
    /// Useful for reading NDJSON from files or network streams.
    ///
    /// # Performance
    ///
    /// The reader is read synchronously but yields to the async runtime
    /// between lines. For large files, this provides good throughput
    /// without blocking the runtime.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use wme_stream::NdjsonStream;
    /// use std::io::BufReader;
    /// use std::fs::File;
    ///
    /// let file = File::open("data.ndjson").unwrap();
    /// let reader = BufReader::new(file);
    /// let lines = NdjsonStream::from_reader(reader);
    /// ```
    pub fn from_reader<R: BufRead + Send + 'static>(
        reader: R,
    ) -> impl Stream<Item = Result<String, std::io::Error>> {
        async_stream::try_stream! {
            for line in reader.lines() {
                yield line?;
            }
        }
    }
}

/// Extension trait for NDJSON parsing on streams.
///
/// Adds `parse_ndjson()` method to any stream of strings,
/// allowing fluent parsing without explicit `NdjsonStream` usage.
///
/// # Example
///
/// ```rust,no_run
/// use wme_stream::NdjsonExt;
/// use futures::StreamExt;
/// use std::pin::pin;
///
/// # async fn example<S>(lines: S)
/// # where
/// #     S: futures::Stream<Item = Result<String, std::io::Error>>,
/// # {
/// use wme_models::Article;
/// let articles = lines.parse_ndjson::<Article>();
/// let mut pinned = pin!(articles);
/// while let Some(result) = pinned.next().await {
///     // Process article
/// }
/// # }
/// ```
pub trait NdjsonExt {
    /// Parse lines as NDJSON into typed structs.
    ///
    /// See [`NdjsonStream::parse`] for details.
    fn parse_ndjson<T: DeserializeOwned>(self)
        -> impl Stream<Item = Result<T, crate::StreamError>>;
}

impl<S> NdjsonExt for S
where
    S: Stream<Item = Result<String, std::io::Error>>,
{
    fn parse_ndjson<T: DeserializeOwned>(
        self,
    ) -> impl Stream<Item = Result<T, crate::StreamError>> {
        NdjsonStream::parse(self)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::stream;
    use serde::Deserialize;
    use std::io::Error;

    #[derive(Deserialize, Debug, PartialEq)]
    struct TestData {
        id: u64,
        name: String,
    }

    #[tokio::test]
    async fn test_parse_valid_lines() {
        let lines = vec![
            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
        ];
        let stream = stream::iter(lines);

        let results: Vec<TestData> = NdjsonStream::parse(stream)
            .map(|r| r.unwrap())
            .collect()
            .await;

        assert_eq!(results.len(), 2);
        assert_eq!(
            results[0],
            TestData {
                id: 1,
                name: "Alice".to_string()
            }
        );
        assert_eq!(
            results[1],
            TestData {
                id: 2,
                name: "Bob".to_string()
            }
        );
    }

    #[tokio::test]
    async fn test_parse_invalid_json() {
        let lines = vec![
            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
            Ok::<_, Error>("not valid json".to_string()),
            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
        ];
        let stream = stream::iter(lines);

        let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;

        assert_eq!(results.len(), 3);
        assert!(results[0].is_ok());
        assert!(results[1].is_err()); // Invalid JSON
        assert!(results[2].is_ok());
    }

    #[tokio::test]
    async fn test_parse_io_error() {
        let lines = vec![
            Ok::<_, Error>(r#"{"id":1,"name":"Alice"}"#.to_string()),
            Err::<String, _>(Error::other("read failed")),
            Ok::<_, Error>(r#"{"id":2,"name":"Bob"}"#.to_string()),
        ];
        let stream = stream::iter(lines);

        let results: Vec<_> = NdjsonStream::parse::<TestData>(stream).collect().await;

        assert_eq!(results.len(), 3);
        assert!(results[0].is_ok());
        assert!(results[1].is_err()); // IO error
        assert!(results[2].is_ok());
    }

    #[test]
    fn test_ndjson_ext() {
        // Just verify the trait exists and compiles
        let lines: Vec<Result<String, Error>> = vec![];
        let stream = stream::iter(lines);
        let _parsed = stream.parse_ndjson::<TestData>();
    }
}