Expand description
Streaming utilities for the Wikimedia Enterprise API.
This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality, and efficient streaming parsing.
§Overview
Wikimedia Enterprise provides data in several formats:
- Snapshot API - Complete project dumps as
.tar.gzwith NDJSON files - Realtime API - Streaming updates via SSE or NDJSON
- Realtime Batch - Hourly bundles of updates as
.tar.gz
This crate helps you process these data sources efficiently.
§Key Features
- NDJSON Streaming - Parse newline-delimited JSON from any source
- Deduplication - Handle duplicate articles automatically (keep latest version)
- Checkpoint/Resume - Save and resume long-running downloads
- Visitor Pattern - Process articles without full materialization
- Progress Tracking - Events and statistics for monitoring
§Modules
ndjson- NDJSON parsing from streams and filesdedup- Duplicate detection and removalcheckpoint- Resume checkpoint save/loadvisitor- Visitor trait for low-memory processing
§Example: Parse NDJSON Stream
use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::io::BufReader;
use std::fs::File;
use std::pin::pin;
// Open NDJSON file
let input_file = File::open("articles.ndjson")?;
let reader = BufReader::new(input_file);
// Create line stream
let lines = NdjsonStream::from_reader(reader);
// Parse into Articles
let articles = NdjsonStream::parse_articles(lines);
// Process each article (pin the stream first)
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
match result {
Ok(article) => println!("{}: {}", article.identifier, article.name),
Err(e) => eprintln!("Error: {}", e),
}
}§Example: Deduplicate Stream
use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;
// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);
// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
// Only latest version of each article
let _article = result.unwrap();
}§Example: Save/Resume Checkpoint
use wme_stream::ResumeCheckpoint;
// Save progress after processing 1000 articles
let checkpoint = ResumeCheckpoint::new(
"enwiki_namespace_0",
"chunk_0",
5000, // line offset
1000, // articles processed
);
checkpoint.save("/data/checkpoints/").await?;
// Later: resume from checkpoint
let checkpoint = ResumeCheckpoint::load(
"/data/checkpoints/enwiki_namespace_0.checkpoint.json"
).await?;
println!("Resuming from line {}", checkpoint.line_offset);§wme-stream
Streaming utilities for the Wikimedia Enterprise API.
This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality, and efficient streaming parsing.
§Features
- NDJSON Streaming - Parse newline-delimited JSON from snapshots and realtime feeds
- Deduplication - Handle duplicate articles (< 1% in snapshots) by keeping the latest version
- Checkpoint/Resume - Save and resume long-running downloads from any point
- Visitor Pattern - Process articles without full materialization for low memory usage
- Progress Events - Track download and processing progress
- Statistics - Collect processing metrics (articles/sec, bytes, errors)
§Usage
§Parse NDJSON from a Snapshot
use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::fs::File;
use std::io::BufReader;
use std::pin::pin;
// Read from a tarball or file
let file = File::open("articles.ndjson")?;
let reader = BufReader::new(file);
let lines = NdjsonStream::from_reader(reader);
// Parse into Article structs
let articles = NdjsonStream::parse_articles(lines);
// Process articles
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
match result {
Ok(article) => println!("{}: {}", article.identifier, article.name),
Err(e) => eprintln!("Parse error: {}", e),
}
}§Handle Duplicates
use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;
// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);
// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
// Only latest version of each article
let _article = result?;
}§Checkpoint and Resume
use wme_stream::ResumeCheckpoint;
// Save progress
let checkpoint = ResumeCheckpoint::new(
"enwiki_namespace_0",
"chunk_0",
5000, // line_offset
1000, // articles_processed
);
checkpoint.save("/data/checkpoints/").await?;
// Resume later
let checkpoint = ResumeCheckpoint::load("/data/checkpoints/enwiki_namespace_0.checkpoint.json").await?;
// Continue from checkpoint.line_offset§Visitor Pattern (Low Memory)
use wme_stream::ArticleVisitor;
use wme_models::Article;
use serde_json::Value;
struct MyVisitor {
article_count: u64,
}
impl ArticleVisitor for MyVisitor {
fn visit_article_start(&mut self, id: u64, name: &str) {
self.article_count += 1;
}
fn visit_category(&mut self, name: &str, url: &str) {
// Process category without storing full article
}
fn visit_link(&mut self, text: &str, url: &str) {
// Process link
}
fn visit_infobox(&mut self, name: &str, value: &str) {
// Process infobox field
}
fn visit_reference(&mut self, id: &str, ref_type: &str, metadata: &Value) {
// Process reference
}
fn visit_article_end(&mut self) {}
}§Architecture
§Modules
ndjson- NDJSON parsing utilitiesdedup- Duplicate detection and removalcheckpoint- Resume checkpoint functionalityvisitor- Visitor trait for streaming processing
§Key Types
NdjsonStream- Parse NDJSON streamsdedup_stream()- Wrap streams to deduplicate articlesResumeCheckpoint- Save/load processing stateArticleVisitor- Trait for visitor patternProcessingStats- Track processing metricsSnapshotEvent- Progress events for UI updates
§Snapshot Processing
Snapshots are delivered as .tar.gz files containing NDJSON:
- Download chunks (use parallel downloads for speed)
- Decompress tarball
- Stream NDJSON lines
- Deduplicate articles
- Process or store results
§Realtime Streaming
Realtime API provides SSE or NDJSON streams:
- Connect to streaming endpoint
- Receive events (update, delete, visibility-change)
- Track partition/offset for resume
- Process events incrementally
§License
This project is licensed under the terms of the workspace license.
Re-exports§
pub use checkpoint::ResumeCheckpoint;pub use dedup::dedup_collect;pub use dedup::dedup_stream;pub use ndjson::NdjsonExt;pub use ndjson::NdjsonStream;pub use visitor::ArticleVisitor;pub use visitor::NoOpVisitor;pub use visitor::StatsVisitor;
Modules§
- checkpoint
- Checkpoint and resume functionality.
- dedup
- Deduplication utilities.
- ndjson
- NDJSON streaming utilities.
- visitor
- Visitor trait for article processing.
Structs§
- Processing
Stats - Processing statistics.
Enums§
- Snapshot
Event - Progress event for snapshot processing.
- Stream
Error - Errors that can occur during streaming.