wme_stream/lib.rs
1//! Streaming utilities for the Wikimedia Enterprise API.
2//!
3//! This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise
4//! APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality,
5//! and efficient streaming parsing.
6//!
7//! # Overview
8//!
9//! Wikimedia Enterprise provides data in several formats:
10//! - **Snapshot API** - Complete project dumps as `.tar.gz` with NDJSON files
11//! - **Realtime API** - Streaming updates via SSE or NDJSON
12//! - **Realtime Batch** - Hourly bundles of updates as `.tar.gz`
13//!
14//! This crate helps you process these data sources efficiently.
15//!
16//! # Key Features
17//!
18//! - **NDJSON Streaming** - Parse newline-delimited JSON from any source
19//! - **Deduplication** - Handle duplicate articles automatically (keep latest version)
20//! - **Checkpoint/Resume** - Save and resume long-running downloads
21//! - **Visitor Pattern** - Process articles without full materialization
22//! - **Progress Tracking** - Events and statistics for monitoring
23//!
24//! # Modules
25//!
26//! - [`ndjson`] - NDJSON parsing from streams and files
27//! - [`dedup`] - Duplicate detection and removal
28//! - [`checkpoint`] - Resume checkpoint save/load
29//! - [`visitor`] - Visitor trait for low-memory processing
30//!
31//! # Example: Parse NDJSON Stream
32//!
33//! ```rust,no_run
34//! use wme_stream::NdjsonStream;
35//! use wme_models::Article;
36//! use futures::StreamExt;
37//! use std::io::BufReader;
38//! use std::fs::File;
39//! use std::pin::pin;
40//!
41//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
42//! // Open NDJSON file
43//! let input_file = File::open("articles.ndjson")?;
44//! let reader = BufReader::new(input_file);
45//!
46//! // Create line stream
47//! let lines = NdjsonStream::from_reader(reader);
48//!
49//! // Parse into Articles
50//! let articles = NdjsonStream::parse_articles(lines);
51//!
52//! // Process each article (pin the stream first)
53//! let mut pinned = pin!(articles);
54//! while let Some(result) = pinned.next().await {
55//! match result {
56//! Ok(article) => println!("{}: {}", article.identifier, article.name),
57//! Err(e) => eprintln!("Error: {}", e),
58//! }
59//! }
60//! # Ok(())
61//! # }
62//! ```
63//!
64//! # Example: Deduplicate Stream
65//!
66//! ```rust,no_run
67//! use wme_stream::dedup_stream;
68//! use futures::StreamExt;
69//! use std::pin::pin;
70//!
71//! # async fn example<S>(stream: S)
72//! # where
73//! # S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
74//! # {
75//! // Wrap stream to deduplicate
76//! let deduplicated = dedup_stream(stream);
77//!
78//! // Pin the stream before iterating
79//! let mut pinned = pin!(deduplicated);
80//! while let Some(result) = pinned.next().await {
81//! // Only latest version of each article
82//! let _article = result.unwrap();
83//! }
84//! # }
85//! ```
86//!
87//! # Example: Save/Resume Checkpoint
88//!
89//! ```rust,no_run
90//! use wme_stream::ResumeCheckpoint;
91//!
92//! # async fn example() -> Result<(), wme_stream::StreamError> {
93//! // Save progress after processing 1000 articles
94//! let checkpoint = ResumeCheckpoint::new(
95//! "enwiki_namespace_0",
96//! "chunk_0",
97//! 5000, // line offset
98//! 1000, // articles processed
99//! );
100//! checkpoint.save("/data/checkpoints/").await?;
101//!
102//! // Later: resume from checkpoint
103//! let checkpoint = ResumeCheckpoint::load(
104//! "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
105//! ).await?;
106//! println!("Resuming from line {}", checkpoint.line_offset);
107//! # Ok(())
108//! # }
109//! ```
110
111#![doc = include_str!("../README.md")]
112
113use std::path::PathBuf;
114
115use chrono::{DateTime, Utc};
116use serde::{Deserialize, Serialize};
117
118pub mod checkpoint;
119pub mod dedup;
120pub mod ndjson;
121pub mod visitor;
122
123pub use checkpoint::ResumeCheckpoint;
124pub use dedup::{dedup_collect, dedup_stream};
125pub use ndjson::{NdjsonExt, NdjsonStream};
126pub use visitor::{ArticleVisitor, NoOpVisitor, StatsVisitor};
127
128/// Progress event for snapshot processing.
129///
130/// These events allow you to monitor the progress of snapshot downloads
131/// and processing. Use them to update progress bars or log activity.
132///
133/// # Example
134///
135/// ```rust,compile_only
136/// use wme_stream::SnapshotEvent;
137///
138/// fn handle_event(event: SnapshotEvent) {
139/// match event {
140/// SnapshotEvent::ChunkStarted { chunk_id, size_bytes } => {
141/// println!("Starting download of {} ({} bytes)", chunk_id, size_bytes);
142/// }
143/// SnapshotEvent::ChunkProgress { chunk_id, bytes_downloaded, bytes_total } => {
144/// let pct = (bytes_downloaded as f64 / bytes_total as f64) * 100.0;
145/// println!("{}: {:.1}%", chunk_id, pct);
146/// }
147/// SnapshotEvent::ArticleProcessed { article_id, title } => {
148/// println!("Processed: {}", title);
149/// }
150/// _ => {}
151/// }
152/// }
153/// ```
154#[derive(Debug, Clone)]
155pub enum SnapshotEvent {
156 /// Manifest loaded successfully.
157 ManifestLoaded {
158 /// Snapshot identifier
159 snapshot_id: String,
160 /// Number of chunks
161 chunks: usize,
162 },
163 /// Chunk download started.
164 ChunkStarted {
165 /// Chunk identifier
166 chunk_id: String,
167 /// Size in bytes
168 size_bytes: u64,
169 },
170 /// Chunk download progress.
171 ChunkProgress {
172 /// Chunk identifier
173 chunk_id: String,
174 /// Bytes downloaded so far
175 bytes_downloaded: u64,
176 /// Total bytes
177 bytes_total: u64,
178 },
179 /// Chunk processing completed.
180 ChunkCompleted {
181 /// Chunk identifier
182 chunk_id: String,
183 /// Number of articles processed
184 articles_count: u64,
185 /// Elapsed time
186 elapsed: std::time::Duration,
187 },
188 /// Individual article processed.
189 ArticleProcessed {
190 /// Article ID
191 article_id: u64,
192 /// Article title
193 title: String,
194 },
195 /// Checkpoint saved.
196 CheckpointSaved {
197 /// Path to checkpoint file
198 path: PathBuf,
199 },
200 /// Error occurred.
201 Error {
202 /// Error details
203 error: StreamError,
204 /// Whether the error is recoverable
205 recoverable: bool,
206 },
207 /// Processing completed.
208 Completed {
209 /// Total articles processed
210 total_articles: u64,
211 /// Total bytes processed
212 total_bytes: u64,
213 },
214}
215
216/// Errors that can occur during streaming.
217///
218/// These errors cover IO, parsing, decompression, network, and resume failures.
219/// All errors implement `std::error::Error` and are cloneable.
220#[derive(thiserror::Error, Debug, Clone)]
221pub enum StreamError {
222 /// IO error (file read/write).
223 #[error("IO error: {0}")]
224 Io(String),
225 /// JSON parse error.
226 #[error("JSON parse error: {0}")]
227 JsonParse(String),
228 /// Decompression error (tar.gz).
229 #[error("Decompression error: {0}")]
230 Decompression(String),
231 /// Network error.
232 #[error("Network error: {0}")]
233 Network(String),
234 /// Checksum mismatch.
235 #[error("Checksum mismatch for {file}")]
236 ChecksumMismatch {
237 /// File identifier
238 file: String,
239 },
240 /// Resume error (checkpoint load/save).
241 #[error("Resume error: {0}")]
242 Resume(String),
243}
244
245/// Processing statistics.
246///
247/// Tracks metrics during snapshot/realtime processing including
248/// articles processed, bytes transferred, errors, and duplicates.
249///
250/// # Example
251///
252/// ```rust
253/// use wme_stream::ProcessingStats;
254/// use chrono::Utc;
255///
256/// let mut stats = ProcessingStats::new();
257/// stats.articles_processed = 1000;
258/// stats.bytes_downloaded = 1024 * 1024 * 100; // 100 MB
259///
260/// let rate = stats.rate();
261/// println!("Processing rate: {} articles/sec", rate);
262/// ```
263#[derive(Debug, Clone, Default, Serialize, Deserialize)]
264pub struct ProcessingStats {
265 /// Total articles processed
266 pub articles_processed: u64,
267 /// Total bytes downloaded
268 pub bytes_downloaded: u64,
269 /// Total bytes decompressed
270 pub bytes_decompressed: u64,
271 /// Errors encountered
272 pub errors: u64,
273 /// Duplicates removed
274 pub duplicates: u64,
275 /// Start time
276 pub started_at: Option<DateTime<Utc>>,
277 /// End time
278 pub completed_at: Option<DateTime<Utc>>,
279}
280
281impl ProcessingStats {
282 /// Create new empty stats with current timestamp.
283 ///
284 /// Sets `started_at` to the current time.
285 pub fn new() -> Self {
286 Self {
287 started_at: Some(Utc::now()),
288 ..Default::default()
289 }
290 }
291
292 /// Merge another stats into this one.
293 ///
294 /// Combines counts from another stats instance. Useful when
295 /// aggregating stats from multiple chunks or workers.
296 pub fn merge(&mut self, other: &ProcessingStats) {
297 self.articles_processed += other.articles_processed;
298 self.bytes_downloaded += other.bytes_downloaded;
299 self.bytes_decompressed += other.bytes_decompressed;
300 self.errors += other.errors;
301 self.duplicates += other.duplicates;
302 }
303
304 /// Calculate processing rate (articles per second).
305 ///
306 /// Returns 0.0 if `started_at` is not set.
307 /// Uses `completed_at` if set, otherwise uses current time.
308 pub fn rate(&self) -> f64 {
309 if let Some(started) = self.started_at {
310 let duration = self
311 .completed_at
312 .unwrap_or_else(Utc::now)
313 .signed_duration_since(started);
314 let seconds = duration.num_seconds() as f64;
315 if seconds > 0.0 {
316 return self.articles_processed as f64 / seconds;
317 }
318 }
319 0.0
320 }
321
322 /// Mark processing as completed.
323 ///
324 /// Sets `completed_at` to the current time.
325 pub fn complete(&mut self) {
326 self.completed_at = Some(Utc::now());
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use chrono::Duration;
334
335 #[test]
336 fn test_processing_stats_new() {
337 let stats = ProcessingStats::new();
338 assert!(stats.started_at.is_some());
339 assert_eq!(stats.articles_processed, 0);
340 assert_eq!(stats.rate(), 0.0); // No time elapsed yet
341 }
342
343 #[test]
344 fn test_processing_stats_merge() {
345 let mut stats1 = ProcessingStats::new();
346 stats1.articles_processed = 100;
347 stats1.bytes_downloaded = 1000;
348 stats1.errors = 5;
349
350 let mut stats2 = ProcessingStats::new();
351 stats2.articles_processed = 50;
352 stats2.bytes_downloaded = 500;
353 stats2.duplicates = 10;
354
355 stats1.merge(&stats2);
356
357 assert_eq!(stats1.articles_processed, 150);
358 assert_eq!(stats1.bytes_downloaded, 1500);
359 assert_eq!(stats1.errors, 5);
360 assert_eq!(stats1.duplicates, 10);
361 }
362
363 #[test]
364 fn test_processing_stats_rate() {
365 let mut stats = ProcessingStats::new();
366 stats.started_at = Some(Utc::now() - Duration::seconds(10));
367 stats.articles_processed = 100;
368
369 let rate = stats.rate();
370 assert!(rate > 0.0);
371 assert!((rate - 10.0).abs() < 1.0); // Approximately 10 articles/sec
372 }
373
374 #[test]
375 fn test_processing_stats_complete() {
376 let mut stats = ProcessingStats::new();
377 assert!(stats.completed_at.is_none());
378
379 stats.complete();
380 assert!(stats.completed_at.is_some());
381 }
382
383 #[test]
384 fn test_stream_error_display() {
385 let err = StreamError::Io("file not found".to_string());
386 assert!(err.to_string().contains("IO error"));
387 assert!(err.to_string().contains("file not found"));
388
389 let err = StreamError::JsonParse("invalid json".to_string());
390 assert!(err.to_string().contains("JSON parse error"));
391 }
392
393 #[test]
394 fn test_snapshot_event_creation() {
395 let event = SnapshotEvent::ManifestLoaded {
396 snapshot_id: "enwiki_namespace_0".to_string(),
397 chunks: 5,
398 };
399
400 match event {
401 SnapshotEvent::ManifestLoaded {
402 snapshot_id,
403 chunks,
404 } => {
405 assert_eq!(snapshot_id, "enwiki_namespace_0");
406 assert_eq!(chunks, 5);
407 }
408 _ => panic!("Wrong event type"),
409 }
410 }
411}