wme-stream 0.1.3

Streaming utilities for the Wikimedia Enterprise API
Documentation

wme-stream

Streaming utilities for the Wikimedia Enterprise API.

This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality, and efficient streaming parsing.

Features

  • NDJSON Streaming - Parse newline-delimited JSON from snapshots and realtime feeds
  • Deduplication - Handle duplicate articles (< 1% in snapshots) by keeping the latest version
  • Checkpoint/Resume - Save and resume long-running downloads from any point
  • Visitor Pattern - Process articles without full materialization for low memory usage
  • Progress Events - Track download and processing progress
  • Statistics - Collect processing metrics (articles/sec, bytes, errors)

Usage

Parse NDJSON from a Snapshot

use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::fs::File;
use std::io::BufReader;
use std::pin::pin;

# async fn example() -> Result<(), Box<dyn std::error::Error>> {
// Read from a tarball or file
let file = File::open("articles.ndjson")?;
let reader = BufReader::new(file);
let lines = NdjsonStream::from_reader(reader);

// Parse into Article structs
let articles = NdjsonStream::parse_articles(lines);

// Process articles
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
    match result {
        Ok(article) => println!("{}: {}", article.identifier, article.name),
        Err(e) => eprintln!("Parse error: {}", e),
    }
}
# Ok(())
# }

Handle Duplicates

use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;

# async fn example<S>(stream: S) -> Result<(), Box<dyn std::error::Error>>
# where
#     S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
# {
// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);

// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
    // Only latest version of each article
    let _article = result?;
}
# Ok(())
# }

Checkpoint and Resume

use wme_stream::ResumeCheckpoint;

# async fn example() -> Result<(), wme_stream::StreamError> {
// Save progress
let checkpoint = ResumeCheckpoint::new(
    "enwiki_namespace_0",
    "chunk_0",
    5000,  // line_offset
    1000,  // articles_processed
);
checkpoint.save("/data/checkpoints/").await?;

// Resume later
let checkpoint = ResumeCheckpoint::load("/data/checkpoints/enwiki_namespace_0.checkpoint.json").await?;
// Continue from checkpoint.line_offset
# Ok(())
# }

Visitor Pattern (Low Memory)

use wme_stream::ArticleVisitor;
use wme_models::Article;
use serde_json::Value;

struct MyVisitor {
    article_count: u64,
}

impl ArticleVisitor for MyVisitor {
    fn visit_article_start(&mut self, id: u64, name: &str) {
        self.article_count += 1;
    }
    
    fn visit_category(&mut self, name: &str, url: &str) {
        // Process category without storing full article
    }
    
    fn visit_link(&mut self, text: &str, url: &str) {
        // Process link
    }
    
    fn visit_infobox(&mut self, name: &str, value: &str) {
        // Process infobox field
    }
    
    fn visit_reference(&mut self, id: &str, ref_type: &str, metadata: &Value) {
        // Process reference
    }
    
    fn visit_article_end(&mut self) {}
}

Architecture

Modules

  • ndjson - NDJSON parsing utilities
  • dedup - Duplicate detection and removal
  • checkpoint - Resume checkpoint functionality
  • visitor - Visitor trait for streaming processing

Key Types

  • NdjsonStream - Parse NDJSON streams
  • dedup_stream() - Wrap streams to deduplicate articles
  • ResumeCheckpoint - Save/load processing state
  • ArticleVisitor - Trait for visitor pattern
  • ProcessingStats - Track processing metrics
  • SnapshotEvent - Progress events for UI updates

Snapshot Processing

Snapshots are delivered as .tar.gz files containing NDJSON:

  1. Download chunks (use parallel downloads for speed)
  2. Decompress tarball
  3. Stream NDJSON lines
  4. Deduplicate articles
  5. Process or store results

Realtime Streaming

Realtime API provides SSE or NDJSON streams:

  1. Connect to streaming endpoint
  2. Receive events (update, delete, visibility-change)
  3. Track partition/offset for resume
  4. Process events incrementally

License

This project is licensed under the terms of the workspace license.