wme-stream 0.1.1

Streaming utilities for the Wikimedia Enterprise API
Documentation
//! Deduplication utilities.
//!
//! Snapshots and Realtime Batch files may contain duplicate articles (< 1%).
//! This module provides utilities to deduplicate streams, keeping only the
//! latest version of each article based on `version.identifier`.
//!
//! # Deduplication Strategy
//!
//! When duplicates are encountered, the article with the highest
//! `version.identifier` is kept. This is the correct approach because:
//! - `version.identifier` is monotonically increasing
//! - Higher values indicate more recent revisions
//! - Per the API documentation: "When encountering more than one instance
//!   of an article, use the one with the highest version.identifier number"
//!
//! # Streaming vs Collecting
//!
//! Two approaches are provided:
//! - `dedup_stream()` - Streaming deduplication (memory efficient)
//! - `dedup_collect()` - Collect all then deduplicate (simpler, more memory)
//!
//! # Example: Streaming Deduplication
//!
//! ```rust,no_run
//! use wme_stream::dedup_stream;
//! use futures::StreamExt;
//!
//! # async fn example<S>(stream: S)
//! # where
//! #     S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
//! # {
//! // Wrap any stream of Articles
//! let mut deduplicated = dedup_stream(stream);
//!
//! while let Some(result) = deduplicated.next().await {
//!     match result {
//!         Ok(article) => println!("Latest: {}", article.name),
//!         Err(e) => eprintln!("Error: {}", e),
//!     }
//! }
//! # }
//! ```
//!
//! # Example: Collect and Deduplicate
//!
//! ```rust,no_run
//! use wme_stream::dedup_collect;
//!
//! # async fn example(articles: Vec<wme_models::Article>) {
//! // Deduplicate a collected Vec
//! let unique: Vec<wme_models::Article> = dedup_collect(articles).await;
//! println!("Unique articles: {}", unique.len());
//! # }
//! ```

use std::collections::HashMap;
use std::pin::Pin;

use futures::Stream;

use wme_models::Article;

/// Deduplicate articles by keeping the one with the highest version identifier.
///
/// When duplicates appear in snapshots, this keeps the most recent version
/// based on version.identifier (which is monotonically increasing).
///
/// # Memory Usage
///
/// This stores a HashMap of seen articles in memory. For large snapshots,
/// memory usage is roughly proportional to the number of unique articles
/// (not duplicates). Each stored article requires:
/// - 8 bytes for the HashMap key (u64)
/// - Size of the Article struct
///
/// # Example
///
/// ```rust,no_run
/// use wme_stream::{dedup_stream, StreamError};
/// use wme_models::Article;
/// use futures::{Stream, StreamExt};
///
/// # async fn example<S>(stream: S)
/// # where
/// #     S: Stream<Item = Result<Article, StreamError>>,
/// # {
/// let mut deduplicated = dedup_stream(stream);
///
/// while let Some(result) = deduplicated.next().await {
///     if let Ok(article) = result {
///         println!("{} (version {})", article.name, article.version.identifier);
///     }
/// }
/// # }
/// ```
///
/// # How It Works
///
/// 1. Stream yields articles one at a time
/// 2. Check if we've seen this article ID before
/// 3. If new: store and yield immediately
/// 4. If duplicate: compare version.identifier
/// 5. Keep the one with higher version.identifier
/// 6. Continue to next article
///
/// Note: The first occurrence of an article is always yielded. If a later
/// duplicate has a higher version, it's stored but not re-yielded.
pub fn dedup_stream<S>(stream: S) -> impl Stream<Item = Result<Article, crate::StreamError>>
where
    S: Stream<Item = Result<Article, crate::StreamError>>,
{
    use std::task::{Context, Poll};

    struct DedupStream<S> {
        inner: Pin<Box<S>>,
        seen: HashMap<u64, Article>,
    }

    impl<S: Stream<Item = Result<Article, crate::StreamError>>> Stream for DedupStream<S> {
        type Item = Result<Article, crate::StreamError>;

        fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            loop {
                match self.inner.as_mut().poll_next(cx) {
                    Poll::Ready(Some(Ok(article))) => {
                        let id = article.identifier;

                        match self.seen.get(&id) {
                            Some(existing) => {
                                // Keep article with higher version identifier
                                if article.version.identifier > existing.version.identifier {
                                    self.seen.insert(id, article);
                                }
                                // Continue to next item (skip this duplicate)
                                continue;
                            }
                            None => {
                                // Clone for storage, return original
                                self.seen.insert(id, article.clone());
                                // Return the first occurrence
                                return Poll::Ready(Some(Ok(article)));
                            }
                        }
                    }
                    Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
                    Poll::Ready(None) => return Poll::Ready(None),
                    Poll::Pending => return Poll::Pending,
                }
            }
        }
    }

    DedupStream {
        inner: Box::pin(stream),
        seen: HashMap::new(),
    }
}

/// Alternative: Deduplicate using collection (consumes more memory).
///
/// This collects all articles into memory first, then deduplicates.
/// Simpler to use but consumes more memory than streaming.
///
/// # When to Use
///
/// - When you already have a `Vec<Article>`
/// - When memory is not a concern
/// - When you need all unique articles at once
///
/// # Example
///
/// ```rust,no_run
/// use wme_stream::dedup_collect;
///
/// # async fn example(articles: Vec<wme_models::Article>) {
/// let unique = dedup_collect(articles).await;
/// println!("{} unique articles", unique.len());
/// # }
/// ```
///
/// # Memory Usage
///
/// Stores all unique articles in a HashMap temporarily, then collects
/// into a Vec. Peak memory is approximately:
/// - Size of input Vec
/// - Plus HashMap overhead for unique articles
/// - Plus output Vec
///
/// For large datasets, use `dedup_stream()` instead.
pub async fn dedup_collect(articles: Vec<Article>) -> Vec<Article> {
    let mut map: HashMap<u64, Article> = HashMap::new();

    for article in articles {
        let id = article.identifier;

        match map.get(&id) {
            Some(existing) => {
                if article.version.identifier > existing.version.identifier {
                    map.insert(id, article);
                }
            }
            None => {
                map.insert(id, article);
            }
        }
    }

    map.into_values().collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::Utc;
    use futures::StreamExt;
    use wme_models::version::{Editor, Version};

    fn create_article(id: u64, version_id: u64) -> Article {
        Article {
            identifier: id,
            name: format!("Article {}", id),
            url: format!("https://en.wikipedia.org/wiki/{}", id),
            abstract_text: None,
            description: None,
            date_modified: Utc::now(),
            date_previously_modified: None,
            in_language: wme_models::Language {
                identifier: Some("en".to_string()),
                name: Some("English".to_string()),
                alternate_name: None,
                direction: Some("ltr".to_string()),
            },
            is_part_of: wme_models::ProjectRef {
                identifier: "enwiki".to_string(),
                url: Some("https://en.wikipedia.org".to_string()),
            },
            namespace: Some(wme_models::Namespace {
                identifier: 0,
                name: Some("".to_string()),
                description: Some("Main".to_string()),
            }),
            main_entity: None,
            additional_entities: None,
            categories: None,
            templates: None,
            redirects: None,
            version: Version {
                identifier: version_id,
                editor: Some(Editor {
                    identifier: Some(1),
                    name: Some("Test".to_string()),
                    is_bot: Some(false),
                    is_anonymous: Some(false),
                    date_started: Some(Utc::now()),
                    edit_count: Some(1),
                    groups: Some(vec!["user".to_string()]),
                    is_admin: Some(false),
                    is_patroller: Some(false),
                    has_advanced_rights: Some(false),
                }),
                comment: None,
                tags: None,
                has_tag_needs_citation: None,
                is_minor_edit: None,
                is_flagged_stable: None,
                is_breaking_news: None,
                noindex: None,
                number_of_characters: None,
                size: None,
                maintenance_tags: None,
                scores: None,
            },
            previous_version: None,
            watchers_count: None,
            protection: None,
            visibility: None,
            image: None,
            license: vec![],
            article_body: None,
            event: None,
            has_parts: None,
        }
    }

    #[tokio::test]
    async fn test_dedup_stream_no_duplicates() {
        use futures::stream;

        let articles = vec![
            Ok(create_article(1, 100)),
            Ok(create_article(2, 200)),
            Ok(create_article(3, 300)),
        ];
        let stream = stream::iter(articles);

        let results: Vec<_> = dedup_stream(stream).collect().await;
        assert_eq!(results.len(), 3);
    }

    #[tokio::test]
    async fn test_dedup_stream_with_duplicates() {
        use futures::stream;

        let articles = vec![
            Ok(create_article(1, 100)),
            Ok(create_article(2, 200)),
            Ok(create_article(1, 150)), // Duplicate with higher version
            Ok(create_article(3, 300)),
            Ok(create_article(2, 250)), // Duplicate with higher version
        ];
        let stream = stream::iter(articles);

        let results: Vec<_> = dedup_stream(stream).collect().await;
        assert_eq!(results.len(), 3); // 3 unique articles

        // All results should be the first occurrence
        let ids: Vec<u64> = results
            .iter()
            .map(|r| r.as_ref().unwrap().identifier)
            .collect();
        assert!(ids.contains(&1));
        assert!(ids.contains(&2));
        assert!(ids.contains(&3));
    }

    #[tokio::test]
    async fn test_dedup_stream_preserves_latest_version() {
        use futures::stream;

        // First occurrence is yielded, but later duplicates with higher
        // version are tracked (though not re-yielded)
        let articles = vec![
            Ok(create_article(1, 100)),
            Ok(create_article(1, 200)), // Higher version
            Ok(create_article(1, 50)),  // Lower version (should be ignored)
        ];
        let stream = stream::iter(articles);

        let results: Vec<_> = dedup_stream(stream).collect().await;
        assert_eq!(results.len(), 1); // Only one unique article

        // The first occurrence (version 100) is yielded
        let article = results[0].as_ref().unwrap();
        assert_eq!(article.identifier, 1);
        assert_eq!(article.version.identifier, 100);
    }

    #[tokio::test]
    async fn test_dedup_collect() {
        let articles = vec![
            create_article(1, 100),
            create_article(2, 200),
            create_article(1, 150), // Duplicate with higher version
            create_article(3, 300),
        ];

        let unique = dedup_collect(articles).await;
        assert_eq!(unique.len(), 3);

        // Article 1 should have version 150 (highest)
        let article_1 = unique.iter().find(|a| a.identifier == 1).unwrap();
        assert_eq!(article_1.version.identifier, 150);
    }

    #[tokio::test]
    async fn test_dedup_collect_prefers_higher_version() {
        let articles = vec![
            create_article(1, 100),
            create_article(1, 200), // Higher version
            create_article(1, 150), // Middle version
        ];

        let unique = dedup_collect(articles).await;
        assert_eq!(unique.len(), 1);
        assert_eq!(unique[0].version.identifier, 200); // Highest version kept
    }
}