#![doc = include_str!("../README.md")]
use std::path::PathBuf;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
pub mod checkpoint;
pub mod dedup;
pub mod ndjson;
pub mod visitor;
pub use checkpoint::ResumeCheckpoint;
pub use dedup::{dedup_collect, dedup_stream};
pub use ndjson::{NdjsonExt, NdjsonStream};
pub use visitor::{ArticleVisitor, NoOpVisitor, StatsVisitor};
#[derive(Debug, Clone)]
pub enum SnapshotEvent {
ManifestLoaded {
snapshot_id: String,
chunks: usize,
},
ChunkStarted {
chunk_id: String,
size_bytes: u64,
},
ChunkProgress {
chunk_id: String,
bytes_downloaded: u64,
bytes_total: u64,
},
ChunkCompleted {
chunk_id: String,
articles_count: u64,
elapsed: std::time::Duration,
},
ArticleProcessed {
article_id: u64,
title: String,
},
CheckpointSaved {
path: PathBuf,
},
Error {
error: StreamError,
recoverable: bool,
},
Completed {
total_articles: u64,
total_bytes: u64,
},
}
#[derive(thiserror::Error, Debug, Clone)]
pub enum StreamError {
#[error("IO error: {0}")]
Io(String),
#[error("JSON parse error: {0}")]
JsonParse(String),
#[error("Decompression error: {0}")]
Decompression(String),
#[error("Network error: {0}")]
Network(String),
#[error("Checksum mismatch for {file}")]
ChecksumMismatch {
file: String,
},
#[error("Resume error: {0}")]
Resume(String),
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProcessingStats {
pub articles_processed: u64,
pub bytes_downloaded: u64,
pub bytes_decompressed: u64,
pub errors: u64,
pub duplicates: u64,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: Option<DateTime<Utc>>,
}
impl ProcessingStats {
pub fn new() -> Self {
Self {
started_at: Some(Utc::now()),
..Default::default()
}
}
pub fn merge(&mut self, other: &ProcessingStats) {
self.articles_processed += other.articles_processed;
self.bytes_downloaded += other.bytes_downloaded;
self.bytes_decompressed += other.bytes_decompressed;
self.errors += other.errors;
self.duplicates += other.duplicates;
}
pub fn rate(&self) -> f64 {
if let Some(started) = self.started_at {
let duration = self
.completed_at
.unwrap_or_else(Utc::now)
.signed_duration_since(started);
let seconds = duration.num_seconds() as f64;
if seconds > 0.0 {
return self.articles_processed as f64 / seconds;
}
}
0.0
}
pub fn complete(&mut self) {
self.completed_at = Some(Utc::now());
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
#[test]
fn test_processing_stats_new() {
let stats = ProcessingStats::new();
assert!(stats.started_at.is_some());
assert_eq!(stats.articles_processed, 0);
assert_eq!(stats.rate(), 0.0); }
#[test]
fn test_processing_stats_merge() {
let mut stats1 = ProcessingStats::new();
stats1.articles_processed = 100;
stats1.bytes_downloaded = 1000;
stats1.errors = 5;
let mut stats2 = ProcessingStats::new();
stats2.articles_processed = 50;
stats2.bytes_downloaded = 500;
stats2.duplicates = 10;
stats1.merge(&stats2);
assert_eq!(stats1.articles_processed, 150);
assert_eq!(stats1.bytes_downloaded, 1500);
assert_eq!(stats1.errors, 5);
assert_eq!(stats1.duplicates, 10);
}
#[test]
fn test_processing_stats_rate() {
let mut stats = ProcessingStats::new();
stats.started_at = Some(Utc::now() - Duration::seconds(10));
stats.articles_processed = 100;
let rate = stats.rate();
assert!(rate > 0.0);
assert!((rate - 10.0).abs() < 1.0); }
#[test]
fn test_processing_stats_complete() {
let mut stats = ProcessingStats::new();
assert!(stats.completed_at.is_none());
stats.complete();
assert!(stats.completed_at.is_some());
}
#[test]
fn test_stream_error_display() {
let err = StreamError::Io("file not found".to_string());
assert!(err.to_string().contains("IO error"));
assert!(err.to_string().contains("file not found"));
let err = StreamError::JsonParse("invalid json".to_string());
assert!(err.to_string().contains("JSON parse error"));
}
#[test]
fn test_snapshot_event_creation() {
let event = SnapshotEvent::ManifestLoaded {
snapshot_id: "enwiki_namespace_0".to_string(),
chunks: 5,
};
match event {
SnapshotEvent::ManifestLoaded {
snapshot_id,
chunks,
} => {
assert_eq!(snapshot_id, "enwiki_namespace_0");
assert_eq!(chunks, 5);
}
_ => panic!("Wrong event type"),
}
}
}