use std::collections::HashMap;
use std::pin::Pin;
use futures::Stream;
use wme_models::Article;
pub fn dedup_stream<S>(stream: S) -> impl Stream<Item = Result<Article, crate::StreamError>>
where
S: Stream<Item = Result<Article, crate::StreamError>>,
{
use std::task::{Context, Poll};
struct DedupStream<S> {
inner: Pin<Box<S>>,
seen: HashMap<u64, Article>,
}
impl<S: Stream<Item = Result<Article, crate::StreamError>>> Stream for DedupStream<S> {
type Item = Result<Article, crate::StreamError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.inner.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(article))) => {
let id = article.identifier;
match self.seen.get(&id) {
Some(existing) => {
if article.version.identifier > existing.version.identifier {
self.seen.insert(id, article);
}
continue;
}
None => {
self.seen.insert(id, article.clone());
return Poll::Ready(Some(Ok(article)));
}
}
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}
DedupStream {
inner: Box::pin(stream),
seen: HashMap::new(),
}
}
pub async fn dedup_collect(articles: Vec<Article>) -> Vec<Article> {
let mut map: HashMap<u64, Article> = HashMap::new();
for article in articles {
let id = article.identifier;
match map.get(&id) {
Some(existing) => {
if article.version.identifier > existing.version.identifier {
map.insert(id, article);
}
}
None => {
map.insert(id, article);
}
}
}
map.into_values().collect()
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use futures::StreamExt;
use wme_models::version::{Editor, Version};
fn create_article(id: u64, version_id: u64) -> Article {
Article {
identifier: id,
name: format!("Article {}", id),
url: format!("https://en.wikipedia.org/wiki/{}", id),
abstract_text: None,
description: None,
date_modified: Utc::now(),
date_previously_modified: None,
in_language: wme_models::Language {
identifier: Some("en".to_string()),
name: Some("English".to_string()),
alternate_name: None,
direction: Some("ltr".to_string()),
},
is_part_of: wme_models::ProjectRef {
identifier: "enwiki".to_string(),
url: Some("https://en.wikipedia.org".to_string()),
},
namespace: Some(wme_models::Namespace {
identifier: 0,
name: Some("".to_string()),
description: Some("Main".to_string()),
}),
main_entity: None,
additional_entities: None,
categories: None,
templates: None,
redirects: None,
version: Version {
identifier: version_id,
editor: Some(Editor {
identifier: Some(1),
name: Some("Test".to_string()),
is_bot: Some(false),
is_anonymous: Some(false),
date_started: Some(Utc::now()),
edit_count: Some(1),
groups: Some(vec!["user".to_string()]),
is_admin: Some(false),
is_patroller: Some(false),
has_advanced_rights: Some(false),
}),
comment: None,
tags: None,
has_tag_needs_citation: None,
is_minor_edit: None,
is_flagged_stable: None,
is_breaking_news: None,
noindex: None,
number_of_characters: None,
size: None,
maintenance_tags: None,
scores: None,
},
previous_version: None,
watchers_count: None,
protection: None,
visibility: None,
image: None,
license: vec![],
article_body: None,
event: None,
has_parts: None,
}
}
#[tokio::test]
async fn test_dedup_stream_no_duplicates() {
use futures::stream;
let articles = vec![
Ok(create_article(1, 100)),
Ok(create_article(2, 200)),
Ok(create_article(3, 300)),
];
let stream = stream::iter(articles);
let results: Vec<_> = dedup_stream(stream).collect().await;
assert_eq!(results.len(), 3);
}
#[tokio::test]
async fn test_dedup_stream_with_duplicates() {
use futures::stream;
let articles = vec![
Ok(create_article(1, 100)),
Ok(create_article(2, 200)),
Ok(create_article(1, 150)), Ok(create_article(3, 300)),
Ok(create_article(2, 250)), ];
let stream = stream::iter(articles);
let results: Vec<_> = dedup_stream(stream).collect().await;
assert_eq!(results.len(), 3);
let ids: Vec<u64> = results
.iter()
.map(|r| r.as_ref().unwrap().identifier)
.collect();
assert!(ids.contains(&1));
assert!(ids.contains(&2));
assert!(ids.contains(&3));
}
#[tokio::test]
async fn test_dedup_stream_preserves_latest_version() {
use futures::stream;
let articles = vec![
Ok(create_article(1, 100)),
Ok(create_article(1, 200)), Ok(create_article(1, 50)), ];
let stream = stream::iter(articles);
let results: Vec<_> = dedup_stream(stream).collect().await;
assert_eq!(results.len(), 1);
let article = results[0].as_ref().unwrap();
assert_eq!(article.identifier, 1);
assert_eq!(article.version.identifier, 100);
}
#[tokio::test]
async fn test_dedup_collect() {
let articles = vec![
create_article(1, 100),
create_article(2, 200),
create_article(1, 150), create_article(3, 300),
];
let unique = dedup_collect(articles).await;
assert_eq!(unique.len(), 3);
let article_1 = unique.iter().find(|a| a.identifier == 1).unwrap();
assert_eq!(article_1.version.identifier, 150);
}
#[tokio::test]
async fn test_dedup_collect_prefers_higher_version() {
let articles = vec![
create_article(1, 100),
create_article(1, 200), create_article(1, 150), ];
let unique = dedup_collect(articles).await;
assert_eq!(unique.len(), 1);
assert_eq!(unique[0].version.identifier, 200); }
}