use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct ResumeCheckpoint {
pub snapshot_id: String,
pub chunk_id: String,
pub line_offset: u64,
pub articles_processed: u64,
pub timestamp: DateTime<Utc>,
}
impl ResumeCheckpoint {
pub fn new(
snapshot_id: impl Into<String>,
chunk_id: impl Into<String>,
line_offset: u64,
articles_processed: u64,
) -> Self {
Self {
snapshot_id: snapshot_id.into(),
chunk_id: chunk_id.into(),
line_offset,
articles_processed,
timestamp: Utc::now(),
}
}
pub async fn save(&self, path: impl Into<PathBuf>) -> Result<(), crate::StreamError> {
let path = path.into();
let json = serde_json::to_string_pretty(self)
.map_err(|e| crate::StreamError::Io(e.to_string()))?;
tokio::fs::write(&path, json)
.await
.map_err(|e| crate::StreamError::Io(e.to_string()))?;
Ok(())
}
pub async fn load(path: impl Into<PathBuf>) -> Result<Self, crate::StreamError> {
let path = path.into();
let json = tokio::fs::read_to_string(&path)
.await
.map_err(|e| crate::StreamError::Resume(e.to_string()))?;
let checkpoint =
serde_json::from_str(&json).map_err(|e| crate::StreamError::Resume(e.to_string()))?;
Ok(checkpoint)
}
pub fn checkpoint_path(base_dir: impl Into<PathBuf>, snapshot_id: &str) -> PathBuf {
let mut path = base_dir.into();
path.push(format!("{}.checkpoint.json", snapshot_id));
path
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_checkpoint_creation() {
let checkpoint = ResumeCheckpoint::new("enwiki_namespace_0", "chunk_0", 5000, 1000);
assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
assert_eq!(checkpoint.chunk_id, "chunk_0");
assert_eq!(checkpoint.line_offset, 5000);
assert_eq!(checkpoint.articles_processed, 1000);
assert!(checkpoint.timestamp <= Utc::now());
}
#[test]
fn test_checkpoint_path() {
let path = ResumeCheckpoint::checkpoint_path("/tmp", "enwiki");
assert_eq!(path, PathBuf::from("/tmp/enwiki.checkpoint.json"));
let path = ResumeCheckpoint::checkpoint_path(PathBuf::from("/data"), "dewiki_namespace_0");
assert_eq!(
path,
PathBuf::from("/data/dewiki_namespace_0.checkpoint.json")
);
}
#[tokio::test]
async fn test_checkpoint_save_and_load() {
let temp_dir = std::env::temp_dir();
let checkpoint = ResumeCheckpoint::new("test_snapshot", "chunk_0", 100, 50);
let path = temp_dir.join("test.checkpoint.json");
checkpoint.save(&path).await.unwrap();
let loaded = ResumeCheckpoint::load(&path).await.unwrap();
assert_eq!(loaded.snapshot_id, checkpoint.snapshot_id);
assert_eq!(loaded.chunk_id, checkpoint.chunk_id);
assert_eq!(loaded.line_offset, checkpoint.line_offset);
assert_eq!(loaded.articles_processed, checkpoint.articles_processed);
tokio::fs::remove_file(&path).await.unwrap();
}
#[tokio::test]
async fn test_checkpoint_load_not_found() {
let result = ResumeCheckpoint::load("/nonexistent/path.json").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_checkpoint_load_invalid_json() {
let temp_dir = std::env::temp_dir();
let path = temp_dir.join("invalid.checkpoint.json");
tokio::fs::write(&path, "not valid json").await.unwrap();
let result = ResumeCheckpoint::load(&path).await;
assert!(result.is_err());
tokio::fs::remove_file(&path).await.unwrap();
}
#[test]
fn test_checkpoint_serialization() {
let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 5000, 1000);
let json = serde_json::to_string(&checkpoint).unwrap();
assert!(json.contains("enwiki"));
assert!(json.contains("5000"));
assert!(json.contains("1000"));
let deserialized: ResumeCheckpoint = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.snapshot_id, checkpoint.snapshot_id);
assert_eq!(deserialized.line_offset, checkpoint.line_offset);
}
#[test]
fn test_checkpoint_clone() {
let checkpoint = ResumeCheckpoint::new("test", "chunk", 100, 50);
let cloned = checkpoint.clone();
assert_eq!(checkpoint, cloned);
}
}