wme-stream 0.1.1

Streaming utilities for the Wikimedia Enterprise API
Documentation
# wme-stream

Streaming utilities for the Wikimedia Enterprise API.

This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise
APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality,
and efficient streaming parsing.

## Features

- **NDJSON Streaming** - Parse newline-delimited JSON from snapshots and realtime feeds
- **Deduplication** - Handle duplicate articles (< 1% in snapshots) by keeping the latest version
- **Checkpoint/Resume** - Save and resume long-running downloads from any point
- **Visitor Pattern** - Process articles without full materialization for low memory usage
- **Progress Events** - Track download and processing progress
- **Statistics** - Collect processing metrics (articles/sec, bytes, errors)

## Usage

### Parse NDJSON from a Snapshot

```rust,no_run
use wme_stream::NdjsonStream;
use wme_models::Article;
use futures::StreamExt;
use std::fs::File;
use std::io::BufReader;
use std::pin::pin;

# async fn example() -> Result<(), Box<dyn std::error::Error>> {
// Read from a tarball or file
let file = File::open("articles.ndjson")?;
let reader = BufReader::new(file);
let lines = NdjsonStream::from_reader(reader);

// Parse into Article structs
let articles = NdjsonStream::parse_articles(lines);

// Process articles
let mut pinned = pin!(articles);
while let Some(result) = pinned.next().await {
    match result {
        Ok(article) => println!("{}: {}", article.identifier, article.name),
        Err(e) => eprintln!("Parse error: {}", e),
    }
}
# Ok(())
# }
```

### Handle Duplicates

```rust,no_run
use wme_stream::dedup_stream;
use futures::StreamExt;
use std::pin::pin;

# async fn example<S>(stream: S) -> Result<(), Box<dyn std::error::Error>>
# where
#     S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
# {
// Wrap stream to deduplicate
let deduplicated = dedup_stream(stream);

// Pin the stream before iterating
let mut pinned = pin!(deduplicated);
while let Some(result) = pinned.next().await {
    // Only latest version of each article
    let _article = result?;
}
# Ok(())
# }
```

### Checkpoint and Resume

```rust,no_run
use wme_stream::ResumeCheckpoint;

# async fn example() -> Result<(), wme_stream::StreamError> {
// Save progress
let checkpoint = ResumeCheckpoint::new(
    "enwiki_namespace_0",
    "chunk_0",
    5000,  // line_offset
    1000,  // articles_processed
);
checkpoint.save("/data/checkpoints/").await?;

// Resume later
let checkpoint = ResumeCheckpoint::load("/data/checkpoints/enwiki_namespace_0.checkpoint.json").await?;
// Continue from checkpoint.line_offset
# Ok(())
# }
```

### Visitor Pattern (Low Memory)

```rust
use wme_stream::ArticleVisitor;
use wme_models::Article;
use serde_json::Value;

struct MyVisitor {
    article_count: u64,
}

impl ArticleVisitor for MyVisitor {
    fn visit_article_start(&mut self, id: u64, name: &str) {
        self.article_count += 1;
    }
    
    fn visit_category(&mut self, name: &str, url: &str) {
        // Process category without storing full article
    }
    
    fn visit_link(&mut self, text: &str, url: &str) {
        // Process link
    }
    
    fn visit_infobox(&mut self, name: &str, value: &str) {
        // Process infobox field
    }
    
    fn visit_reference(&mut self, id: &str, ref_type: &str, metadata: &Value) {
        // Process reference
    }
    
    fn visit_article_end(&mut self) {}
}
```

## Architecture

### Modules

- **`ndjson`** - NDJSON parsing utilities
- **`dedup`** - Duplicate detection and removal
- **`checkpoint`** - Resume checkpoint functionality  
- **`visitor`** - Visitor trait for streaming processing

### Key Types

- `NdjsonStream` - Parse NDJSON streams
- `dedup_stream()` - Wrap streams to deduplicate articles
- `ResumeCheckpoint` - Save/load processing state
- `ArticleVisitor` - Trait for visitor pattern
- `ProcessingStats` - Track processing metrics
- `SnapshotEvent` - Progress events for UI updates

## Snapshot Processing

Snapshots are delivered as `.tar.gz` files containing NDJSON:

1. Download chunks (use parallel downloads for speed)
2. Decompress tarball
3. Stream NDJSON lines
4. Deduplicate articles
5. Process or store results

## Realtime Streaming

Realtime API provides SSE or NDJSON streams:

1. Connect to streaming endpoint
2. Receive events (update, delete, visibility-change)
3. Track partition/offset for resume
4. Process events incrementally

## License

This project is licensed under the terms of the workspace license.