Skip to main content

Crate wme_stream

Crate wme_stream 

Source
Expand description

Streaming utilities for the Wikimedia Enterprise API.

This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality, and efficient streaming parsing.

§Overview

Wikimedia Enterprise provides data in several formats:

  • Snapshot API - Complete project dumps as .tar.gz with NDJSON files
  • Realtime API - Streaming updates via SSE or NDJSON
  • Realtime Batch - Hourly bundles of updates as .tar.gz

This crate helps you process these data sources efficiently.

§Key Features

  • NDJSON Streaming - Parse newline-delimited JSON from any source
  • Deduplication - Handle duplicate articles automatically (keep latest version)
  • Checkpoint/Resume - Save and resume long-running downloads
  • Visitor Pattern - Process articles without full materialization
  • Progress Tracking - Events and statistics for monitoring

§Modules

  • ndjson - NDJSON parsing from streams and files
  • dedup - Duplicate detection and removal
  • checkpoint - Resume checkpoint save/load
  • visitor - Visitor trait for low-memory processing

§Example: Parse NDJSON Stream

use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::io::BufReader;
use std::fs::File;
use std::pin::pin;

// Open NDJSON file
let input_file = File::open("articles.ndjson")?;
let reader = BufReader::new(input_file);

// Create line stream
let lines = NdjsonStream::from_reader(reader);

// Parse into Articles
let articles = NdjsonStream::parse_articles(lines);

// Process each article (pin the stream first)
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
    match result {
        Ok(article) => println!("{}: {}", article.identifier, article.name),
        Err(e) => eprintln!("Error: {}", e),
    }
}

§Example: Deduplicate Stream

use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;

// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);

// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
    // Only latest version of each article
    let _article = result.unwrap();
}

§Example: Save/Resume Checkpoint

use wme_stream::ResumeCheckpoint;

// Save progress after processing 1000 articles
let checkpoint = ResumeCheckpoint::new(
    "enwiki_namespace_0",
    "chunk_0",
    5000,  // line offset
    1000,  // articles processed
);
checkpoint.save("/data/checkpoints/").await?;

// Later: resume from checkpoint
let checkpoint = ResumeCheckpoint::load(
    "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
).await?;
println!("Resuming from line {}", checkpoint.line_offset);

§wme-stream

Streaming utilities for the Wikimedia Enterprise API.

This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality, and efficient streaming parsing.

§Features

  • NDJSON Streaming - Parse newline-delimited JSON from snapshots and realtime feeds
  • Deduplication - Handle duplicate articles (< 1% in snapshots) by keeping the latest version
  • Checkpoint/Resume - Save and resume long-running downloads from any point
  • Visitor Pattern - Process articles without full materialization for low memory usage
  • Progress Events - Track download and processing progress
  • Statistics - Collect processing metrics (articles/sec, bytes, errors)

§Usage

§Parse NDJSON from a Snapshot

use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::fs::File;
use std::io::BufReader;
use std::pin::pin;

// Read from a tarball or file
let file = File::open("articles.ndjson")?;
let reader = BufReader::new(file);
let lines = NdjsonStream::from_reader(reader);

// Parse into Article structs
let articles = NdjsonStream::parse_articles(lines);

// Process articles
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
    match result {
        Ok(article) => println!("{}: {}", article.identifier, article.name),
        Err(e) => eprintln!("Parse error: {}", e),
    }
}

§Handle Duplicates

use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;

// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);

// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
    // Only latest version of each article
    let _article = result?;
}

§Checkpoint and Resume

use wme_stream::ResumeCheckpoint;

// Save progress
let checkpoint = ResumeCheckpoint::new(
    "enwiki_namespace_0",
    "chunk_0",
    5000,  // line_offset
    1000,  // articles_processed
);
checkpoint.save("/data/checkpoints/").await?;

// Resume later
let checkpoint = ResumeCheckpoint::load("/data/checkpoints/enwiki_namespace_0.checkpoint.json").await?;
// Continue from checkpoint.line_offset

§Visitor Pattern (Low Memory)

use wme_stream::ArticleVisitor;
use wme_models::Article;
use serde_json::Value;

struct MyVisitor {
    article_count: u64,
}

impl ArticleVisitor for MyVisitor {
    fn visit_article_start(&mut self, id: u64, name: &str) {
        self.article_count += 1;
    }
    
    fn visit_category(&mut self, name: &str, url: &str) {
        // Process category without storing full article
    }
    
    fn visit_link(&mut self, text: &str, url: &str) {
        // Process link
    }
    
    fn visit_infobox(&mut self, name: &str, value: &str) {
        // Process infobox field
    }
    
    fn visit_reference(&mut self, id: &str, ref_type: &str, metadata: &Value) {
        // Process reference
    }
    
    fn visit_article_end(&mut self) {}
}

§Architecture

§Modules

  • ndjson - NDJSON parsing utilities
  • dedup - Duplicate detection and removal
  • checkpoint - Resume checkpoint functionality
  • visitor - Visitor trait for streaming processing

§Key Types

  • NdjsonStream - Parse NDJSON streams
  • dedup_stream() - Wrap streams to deduplicate articles
  • ResumeCheckpoint - Save/load processing state
  • ArticleVisitor - Trait for visitor pattern
  • ProcessingStats - Track processing metrics
  • SnapshotEvent - Progress events for UI updates

§Snapshot Processing

Snapshots are delivered as .tar.gz files containing NDJSON:

  1. Download chunks (use parallel downloads for speed)
  2. Decompress tarball
  3. Stream NDJSON lines
  4. Deduplicate articles
  5. Process or store results

§Realtime Streaming

Realtime API provides SSE or NDJSON streams:

  1. Connect to streaming endpoint
  2. Receive events (update, delete, visibility-change)
  3. Track partition/offset for resume
  4. Process events incrementally

§License

This project is licensed under the terms of the workspace license.

Re-exports§

pub use checkpoint::ResumeCheckpoint;
pub use dedup::dedup_collect;
pub use dedup::dedup_stream;
pub use ndjson::NdjsonExt;
pub use ndjson::NdjsonStream;
pub use visitor::ArticleVisitor;
pub use visitor::NoOpVisitor;
pub use visitor::StatsVisitor;

Modules§

checkpoint
Checkpoint and resume functionality.
dedup
Deduplication utilities.
ndjson
NDJSON streaming utilities.
visitor
Visitor trait for article processing.

Structs§

ProcessingStats
Processing statistics.

Enums§

SnapshotEvent
Progress event for snapshot processing.
StreamError
Errors that can occur during streaming.