Skip to main content

wme_stream/
lib.rs

1//! Streaming utilities for the Wikimedia Enterprise API.
2//!
3//! This crate provides utilities for processing NDJSON streams from Wikimedia Enterprise
4//! APIs (Snapshot and Realtime). It handles deduplication, checkpoint/resume functionality,
5//! and efficient streaming parsing.
6//!
7//! # Overview
8//!
9//! Wikimedia Enterprise provides data in several formats:
10//! - **Snapshot API** - Complete project dumps as `.tar.gz` with NDJSON files
11//! - **Realtime API** - Streaming updates via SSE or NDJSON
12//! - **Realtime Batch** - Hourly bundles of updates as `.tar.gz`
13//!
14//! This crate helps you process these data sources efficiently.
15//!
16//! # Key Features
17//!
18//! - **NDJSON Streaming** - Parse newline-delimited JSON from any source
19//! - **Deduplication** - Handle duplicate articles automatically (keep latest version)
20//! - **Checkpoint/Resume** - Save and resume long-running downloads
21//! - **Visitor Pattern** - Process articles without full materialization
22//! - **Progress Tracking** - Events and statistics for monitoring
23//!
24//! # Modules
25//!
26//! - [`ndjson`] - NDJSON parsing from streams and files
27//! - [`dedup`] - Duplicate detection and removal
28//! - [`checkpoint`] - Resume checkpoint save/load
29//! - [`visitor`] - Visitor trait for low-memory processing
30//!
31//! # Example: Parse NDJSON Stream
32//!
33//! ```rust,no_run
34//! use wme_stream::NdjsonStream;
35//! use wme_models::Article;
36//! use futures::StreamExt;
37//! use std::io::BufReader;
38//! use std::fs::File;
39//! use std::pin::pin;
40//!
41//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
42//! // Open NDJSON file
43//! let input_file = File::open("articles.ndjson")?;
44//! let reader = BufReader::new(input_file);
45//!
46//! // Create line stream
47//! let lines = NdjsonStream::from_reader(reader);
48//!
49//! // Parse into Articles
50//! let articles = NdjsonStream::parse_articles(lines);
51//!
52//! // Process each article (pin the stream first)
53//! let mut pinned = pin!(articles);
54//! while let Some(result) = pinned.next().await {
55//!     match result {
56//!         Ok(article) => println!("{}: {}", article.identifier, article.name),
57//!         Err(e) => eprintln!("Error: {}", e),
58//!     }
59//! }
60//! # Ok(())
61//! # }
62//! ```
63//!
64//! # Example: Deduplicate Stream
65//!
66//! ```rust,no_run
67//! use wme_stream::dedup_stream;
68//! use futures::StreamExt;
69//! use std::pin::pin;
70//!
71//! # async fn example<S>(stream: S)
72//! # where
73//! #     S: futures::Stream<Item = Result<wme_models::Article, wme_stream::StreamError>>,
74//! # {
75//! // Wrap stream to deduplicate
76//! let deduplicated = dedup_stream(stream);
77//!
78//! // Pin the stream before iterating
79//! let mut pinned = pin!(deduplicated);
80//! while let Some(result) = pinned.next().await {
81//!     // Only latest version of each article
82//!     let _article = result.unwrap();
83//! }
84//! # }
85//! ```
86//!
87//! # Example: Save/Resume Checkpoint
88//!
89//! ```rust,no_run
90//! use wme_stream::ResumeCheckpoint;
91//!
92//! # async fn example() -> Result<(), wme_stream::StreamError> {
93//! // Save progress after processing 1000 articles
94//! let checkpoint = ResumeCheckpoint::new(
95//!     "enwiki_namespace_0",
96//!     "chunk_0",
97//!     5000,  // line offset
98//!     1000,  // articles processed
99//! );
100//! checkpoint.save("/data/checkpoints/").await?;
101//!
102//! // Later: resume from checkpoint
103//! let checkpoint = ResumeCheckpoint::load(
104//!     "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
105//! ).await?;
106//! println!("Resuming from line {}", checkpoint.line_offset);
107//! # Ok(())
108//! # }
109//! ```
110
111#![doc = include_str!("../README.md")]
112
113use std::path::PathBuf;
114
115use chrono::{DateTime, Utc};
116use serde::{Deserialize, Serialize};
117
118pub mod checkpoint;
119pub mod dedup;
120pub mod ndjson;
121pub mod visitor;
122
123pub use checkpoint::ResumeCheckpoint;
124pub use dedup::{dedup_collect, dedup_stream};
125pub use ndjson::{NdjsonExt, NdjsonStream};
126pub use visitor::{ArticleVisitor, NoOpVisitor, StatsVisitor};
127
128/// Progress event for snapshot processing.
129///
130/// These events allow you to monitor the progress of snapshot downloads
131/// and processing. Use them to update progress bars or log activity.
132///
133/// # Example
134///
135/// ```rust,compile_only
136/// use wme_stream::SnapshotEvent;
137///
138/// fn handle_event(event: SnapshotEvent) {
139///     match event {
140///         SnapshotEvent::ChunkStarted { chunk_id, size_bytes } => {
141///             println!("Starting download of {} ({} bytes)", chunk_id, size_bytes);
142///         }
143///         SnapshotEvent::ChunkProgress { chunk_id, bytes_downloaded, bytes_total } => {
144///             let pct = (bytes_downloaded as f64 / bytes_total as f64) * 100.0;
145///             println!("{}: {:.1}%", chunk_id, pct);
146///         }
147///         SnapshotEvent::ArticleProcessed { article_id, title } => {
148///             println!("Processed: {}", title);
149///         }
150///         _ => {}
151///     }
152/// }
153/// ```
154#[derive(Debug, Clone)]
155pub enum SnapshotEvent {
156    /// Manifest loaded successfully.
157    ManifestLoaded {
158        /// Snapshot identifier
159        snapshot_id: String,
160        /// Number of chunks
161        chunks: usize,
162    },
163    /// Chunk download started.
164    ChunkStarted {
165        /// Chunk identifier
166        chunk_id: String,
167        /// Size in bytes
168        size_bytes: u64,
169    },
170    /// Chunk download progress.
171    ChunkProgress {
172        /// Chunk identifier
173        chunk_id: String,
174        /// Bytes downloaded so far
175        bytes_downloaded: u64,
176        /// Total bytes
177        bytes_total: u64,
178    },
179    /// Chunk processing completed.
180    ChunkCompleted {
181        /// Chunk identifier
182        chunk_id: String,
183        /// Number of articles processed
184        articles_count: u64,
185        /// Elapsed time
186        elapsed: std::time::Duration,
187    },
188    /// Individual article processed.
189    ArticleProcessed {
190        /// Article ID
191        article_id: u64,
192        /// Article title
193        title: String,
194    },
195    /// Checkpoint saved.
196    CheckpointSaved {
197        /// Path to checkpoint file
198        path: PathBuf,
199    },
200    /// Error occurred.
201    Error {
202        /// Error details
203        error: StreamError,
204        /// Whether the error is recoverable
205        recoverable: bool,
206    },
207    /// Processing completed.
208    Completed {
209        /// Total articles processed
210        total_articles: u64,
211        /// Total bytes processed
212        total_bytes: u64,
213    },
214}
215
216/// Errors that can occur during streaming.
217///
218/// These errors cover IO, parsing, decompression, network, and resume failures.
219/// All errors implement `std::error::Error` and are cloneable.
220#[derive(thiserror::Error, Debug, Clone)]
221pub enum StreamError {
222    /// IO error (file read/write).
223    #[error("IO error: {0}")]
224    Io(String),
225    /// JSON parse error.
226    #[error("JSON parse error: {0}")]
227    JsonParse(String),
228    /// Decompression error (tar.gz).
229    #[error("Decompression error: {0}")]
230    Decompression(String),
231    /// Network error.
232    #[error("Network error: {0}")]
233    Network(String),
234    /// Checksum mismatch.
235    #[error("Checksum mismatch for {file}")]
236    ChecksumMismatch {
237        /// File identifier
238        file: String,
239    },
240    /// Resume error (checkpoint load/save).
241    #[error("Resume error: {0}")]
242    Resume(String),
243}
244
245/// Processing statistics.
246///
247/// Tracks metrics during snapshot/realtime processing including
248/// articles processed, bytes transferred, errors, and duplicates.
249///
250/// # Example
251///
252/// ```rust
253/// use wme_stream::ProcessingStats;
254/// use chrono::Utc;
255///
256/// let mut stats = ProcessingStats::new();
257/// stats.articles_processed = 1000;
258/// stats.bytes_downloaded = 1024 * 1024 * 100; // 100 MB
259///
260/// let rate = stats.rate();
261/// println!("Processing rate: {} articles/sec", rate);
262/// ```
263#[derive(Debug, Clone, Default, Serialize, Deserialize)]
264pub struct ProcessingStats {
265    /// Total articles processed
266    pub articles_processed: u64,
267    /// Total bytes downloaded
268    pub bytes_downloaded: u64,
269    /// Total bytes decompressed
270    pub bytes_decompressed: u64,
271    /// Errors encountered
272    pub errors: u64,
273    /// Duplicates removed
274    pub duplicates: u64,
275    /// Start time
276    pub started_at: Option<DateTime<Utc>>,
277    /// End time
278    pub completed_at: Option<DateTime<Utc>>,
279}
280
281impl ProcessingStats {
282    /// Create new empty stats with current timestamp.
283    ///
284    /// Sets `started_at` to the current time.
285    pub fn new() -> Self {
286        Self {
287            started_at: Some(Utc::now()),
288            ..Default::default()
289        }
290    }
291
292    /// Merge another stats into this one.
293    ///
294    /// Combines counts from another stats instance. Useful when
295    /// aggregating stats from multiple chunks or workers.
296    pub fn merge(&mut self, other: &ProcessingStats) {
297        self.articles_processed += other.articles_processed;
298        self.bytes_downloaded += other.bytes_downloaded;
299        self.bytes_decompressed += other.bytes_decompressed;
300        self.errors += other.errors;
301        self.duplicates += other.duplicates;
302    }
303
304    /// Calculate processing rate (articles per second).
305    ///
306    /// Returns 0.0 if `started_at` is not set.
307    /// Uses `completed_at` if set, otherwise uses current time.
308    pub fn rate(&self) -> f64 {
309        if let Some(started) = self.started_at {
310            let duration = self
311                .completed_at
312                .unwrap_or_else(Utc::now)
313                .signed_duration_since(started);
314            let seconds = duration.num_seconds() as f64;
315            if seconds > 0.0 {
316                return self.articles_processed as f64 / seconds;
317            }
318        }
319        0.0
320    }
321
322    /// Mark processing as completed.
323    ///
324    /// Sets `completed_at` to the current time.
325    pub fn complete(&mut self) {
326        self.completed_at = Some(Utc::now());
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use super::*;
333    use chrono::Duration;
334
335    #[test]
336    fn test_processing_stats_new() {
337        let stats = ProcessingStats::new();
338        assert!(stats.started_at.is_some());
339        assert_eq!(stats.articles_processed, 0);
340        assert_eq!(stats.rate(), 0.0); // No time elapsed yet
341    }
342
343    #[test]
344    fn test_processing_stats_merge() {
345        let mut stats1 = ProcessingStats::new();
346        stats1.articles_processed = 100;
347        stats1.bytes_downloaded = 1000;
348        stats1.errors = 5;
349
350        let mut stats2 = ProcessingStats::new();
351        stats2.articles_processed = 50;
352        stats2.bytes_downloaded = 500;
353        stats2.duplicates = 10;
354
355        stats1.merge(&stats2);
356
357        assert_eq!(stats1.articles_processed, 150);
358        assert_eq!(stats1.bytes_downloaded, 1500);
359        assert_eq!(stats1.errors, 5);
360        assert_eq!(stats1.duplicates, 10);
361    }
362
363    #[test]
364    fn test_processing_stats_rate() {
365        let mut stats = ProcessingStats::new();
366        stats.started_at = Some(Utc::now() - Duration::seconds(10));
367        stats.articles_processed = 100;
368
369        let rate = stats.rate();
370        assert!(rate > 0.0);
371        assert!((rate - 10.0).abs() < 1.0); // Approximately 10 articles/sec
372    }
373
374    #[test]
375    fn test_processing_stats_complete() {
376        let mut stats = ProcessingStats::new();
377        assert!(stats.completed_at.is_none());
378
379        stats.complete();
380        assert!(stats.completed_at.is_some());
381    }
382
383    #[test]
384    fn test_stream_error_display() {
385        let err = StreamError::Io("file not found".to_string());
386        assert!(err.to_string().contains("IO error"));
387        assert!(err.to_string().contains("file not found"));
388
389        let err = StreamError::JsonParse("invalid json".to_string());
390        assert!(err.to_string().contains("JSON parse error"));
391    }
392
393    #[test]
394    fn test_snapshot_event_creation() {
395        let event = SnapshotEvent::ManifestLoaded {
396            snapshot_id: "enwiki_namespace_0".to_string(),
397            chunks: 5,
398        };
399
400        match event {
401            SnapshotEvent::ManifestLoaded {
402                snapshot_id,
403                chunks,
404            } => {
405                assert_eq!(snapshot_id, "enwiki_namespace_0");
406                assert_eq!(chunks, 5);
407            }
408            _ => panic!("Wrong event type"),
409        }
410    }
411}