# wme-stream
Streaming utilities for the Wikimedia Enterprise API.
This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise
APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality,
and efficient streaming parsing.
## Features
- **NDJSON Streaming** - Parse newline-delimited JSON from snapshots and realtime feeds
- **Deduplication** - Handle duplicate articles (< 1% in snapshots) by keeping the latest version
- **Checkpoint/Resume** - Save and resume long-running downloads from any point
- **Visitor Pattern** - Process articles without full materialization for low memory usage
- **Progress Events** - Track download and processing progress
- **Statistics** - Collect processing metrics (articles/sec, bytes, errors)
## Usage
### Parse NDJSON from a Snapshot
```rust,no_run
use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::fs::File;
use std::io::BufReader;
use std::pin::pin;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
// Read from a tarball or file
let file = File::open("articles.ndjson")?;
let reader = BufReader::new(file);
let lines = NdjsonStream::from_reader(reader);
// Parse into Article structs
let articles = NdjsonStream::parse_articles(lines);
// Process articles
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
match result {
Ok(article) => println!("{}: {}", article.identifier, article.name),
Err(e) => eprintln!("Parse error: {}", e),
}
}
# Ok(())
# }
```
### Handle Duplicates
```rust,no_run
use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;
# async fn example<S>(stream: S) -> Result<(), Box<dyn std::error::Error>>
# where
# S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
# {
// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);
// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
// Only latest version of each article
let _article = result?;
}
# Ok(())
# }
```
### Checkpoint and Resume
```rust,no_run
use wme_stream::ResumeCheckpoint;
# async fn example() -> Result<(), wme_stream::StreamError> {
// Save progress
let checkpoint = ResumeCheckpoint::new(
"enwiki_namespace_0",
"chunk_0",
5000, // line_offset
1000, // articles_processed
);
checkpoint.save("/data/checkpoints/").await?;
// Resume later
let checkpoint = ResumeCheckpoint::load("/data/checkpoints/enwiki_namespace_0.checkpoint.json").await?;
// Continue from checkpoint.line_offset
# Ok(())
# }
```
### Visitor Pattern (Low Memory)
```rust
use wme_stream::ArticleVisitor;
use wme_models::Article;
use serde_json::Value;
struct MyVisitor {
article_count: u64,
}
impl ArticleVisitor for MyVisitor {
fn visit_article_start(&mut self, id: u64, name: &str) {
self.article_count += 1;
}
fn visit_category(&mut self, name: &str, url: &str) {
// Process category without storing full article
}
fn visit_link(&mut self, text: &str, url: &str) {
// Process link
}
fn visit_infobox(&mut self, name: &str, value: &str) {
// Process infobox field
}
fn visit_reference(&mut self, id: &str, ref_type: &str, metadata: &Value) {
// Process reference
}
fn visit_article_end(&mut self) {}
}
```
## Architecture
### Modules
- **`ndjson`** - NDJSON parsing utilities
- **`dedup`** - Duplicate detection and removal
- **`checkpoint`** - Resume checkpoint functionality
- **`visitor`** - Visitor trait for streaming processing
### Key Types
- `NdjsonStream` - Parse NDJSON streams
- `dedup_stream()` - Wrap streams to deduplicate articles
- `ResumeCheckpoint` - Save/load processing state
- `ArticleVisitor` - Trait for visitor pattern
- `ProcessingStats` - Track processing metrics
- `SnapshotEvent` - Progress events for UI updates
## Snapshot Processing
Snapshots are delivered as `.tar.gz` files containing NDJSON:
1. Download chunks (use parallel downloads for speed)
2. Decompress tarball
3. Stream NDJSON lines
4. Deduplicate articles
5. Process or store results
## Realtime Streaming
Realtime API provides SSE or NDJSON streams:
1. Connect to streaming endpoint
2. Receive events (update, delete, visibility-change)
3. Track partition/offset for resume
4. Process events incrementally
## License
This project is licensed under the terms of the workspace license.