wme-stream 0.1.1

Streaming utilities for the Wikimedia Enterprise API
Documentation
//! Checkpoint and resume functionality.
//!
//! This module provides save/load functionality for processing checkpoints,
//! allowing long-running downloads to be resumed after interruptions.
//!
//! # Use Cases
//!
//! - **Network interruptions** - Resume downloads without starting over
//! - **Scheduled maintenance** - Pause and resume processing
//! - **Crash recovery** - Continue from last known position
//! - **Incremental processing** - Process new data since last checkpoint
//!
//! # Checkpoint Data
//!
//! Checkpoints store:
//! - Snapshot identifier
//! - Chunk identifier (for parallel downloads)
//! - Line offset within chunk
//! - Articles processed count
//! - Timestamp
//!
//! # File Format
//!
//! Checkpoints are saved as JSON files with extension `.checkpoint.json`:
//! ```json
//! {
//!   "snapshot_id": "enwiki_namespace_0",
//!   "chunk_id": "chunk_0",
//!   "line_offset": 5000,
//!   "articles_processed": 1000,
//!   "timestamp": "2024-01-15T10:30:00Z"
//! }
//! ```
//!
//! # Example: Save and Resume
//!
//! ```rust,no_run
//! use wme_stream::ResumeCheckpoint;
//!
//! # async fn example() -> Result<(), wme_stream::StreamError> {
//! // After processing 1000 articles
//! let checkpoint = ResumeCheckpoint::new(
//!     "enwiki_namespace_0",
//!     "chunk_0",
//!     5000,  // Last line processed
//!     1000,  // Articles processed
//! );
//!
//! // Save to disk
//! checkpoint.save("/data/checkpoints/").await?;
//!
//! // Later: load and resume
//! let checkpoint = ResumeCheckpoint::load(
//!     "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
//! ).await?;
//!
//! println!("Resuming from line {}", checkpoint.line_offset);
//! # Ok(())
//! # }
//! ```

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;

/// Resume checkpoint for crash recovery.
///
/// Stores the current position in a snapshot download, allowing
/// processing to be resumed after interruptions.
///
/// # Example
///
/// ```rust
/// use wme_stream::ResumeCheckpoint;
///
/// let checkpoint = ResumeCheckpoint::new(
///     "enwiki_namespace_0",
///     "chunk_0",
///     5000,
///     1000,
/// );
///
/// assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
/// assert_eq!(checkpoint.line_offset, 5000);
/// ```
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
pub struct ResumeCheckpoint {
    /// Snapshot identifier (e.g., "enwiki_namespace_0")
    pub snapshot_id: String,
    /// Chunk identifier (e.g., "chunk_0")
    pub chunk_id: String,
    /// Line offset within chunk (NDJSON line number)
    pub line_offset: u64,
    /// Number of articles processed so far
    pub articles_processed: u64,
    /// Checkpoint timestamp
    pub timestamp: DateTime<Utc>,
}

impl ResumeCheckpoint {
    /// Create a new checkpoint with current timestamp.
    ///
    /// # Arguments
    ///
    /// * `snapshot_id` - The snapshot being processed
    /// * `chunk_id` - The current chunk
    /// * `line_offset` - Current line position in NDJSON file
    /// * `articles_processed` - Total articles processed
    ///
    /// # Example
    ///
    /// ```rust
    /// use wme_stream::ResumeCheckpoint;
    ///
    /// let checkpoint = ResumeCheckpoint::new(
    ///     "enwiki_namespace_0",
    ///     "chunk_0",
    ///     5000,
    ///     1000,
    /// );
    /// ```
    pub fn new(
        snapshot_id: impl Into<String>,
        chunk_id: impl Into<String>,
        line_offset: u64,
        articles_processed: u64,
    ) -> Self {
        Self {
            snapshot_id: snapshot_id.into(),
            chunk_id: chunk_id.into(),
            line_offset,
            articles_processed,
            timestamp: Utc::now(),
        }
    }

    /// Save checkpoint to disk as JSON.
    ///
    /// The file is named `{snapshot_id}.checkpoint.json` and stored
    /// in the specified directory.
    ///
    /// # Arguments
    ///
    /// * `path` - Directory to save checkpoint (or full file path)
    ///
    /// # Errors
    ///
    /// Returns `StreamError::Io` if file write fails.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use wme_stream::ResumeCheckpoint;
    ///
    /// # async fn example() -> Result<(), wme_stream::StreamError> {
    /// let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 100, 50);
    /// checkpoint.save("/data/checkpoints/").await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn save(&self, path: impl Into<PathBuf>) -> Result<(), crate::StreamError> {
        let path = path.into();
        let json = serde_json::to_string_pretty(self)
            .map_err(|e| crate::StreamError::Io(e.to_string()))?;
        tokio::fs::write(&path, json)
            .await
            .map_err(|e| crate::StreamError::Io(e.to_string()))?;
        Ok(())
    }

    /// Load checkpoint from disk.
    ///
    /// # Arguments
    ///
    /// * `path` - Path to checkpoint JSON file
    ///
    /// # Errors
    ///
    /// Returns `StreamError::Resume` if:
    /// - File not found
    /// - Invalid JSON
    /// - Missing fields
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use wme_stream::ResumeCheckpoint;
    ///
    /// # async fn example() -> Result<(), wme_stream::StreamError> {
    /// let checkpoint = ResumeCheckpoint::load(
    ///     "/data/checkpoints/enwiki.checkpoint.json"
    /// ).await?;
    ///
    /// println!("Resuming from line {}", checkpoint.line_offset);
    /// # Ok(())
    /// # }
    /// ```
    pub async fn load(path: impl Into<PathBuf>) -> Result<Self, crate::StreamError> {
        let path = path.into();
        let json = tokio::fs::read_to_string(&path)
            .await
            .map_err(|e| crate::StreamError::Resume(e.to_string()))?;
        let checkpoint =
            serde_json::from_str(&json).map_err(|e| crate::StreamError::Resume(e.to_string()))?;
        Ok(checkpoint)
    }

    /// Get checkpoint file path for a snapshot.
    ///
    /// Generates a standard checkpoint filename in the specified directory.
    ///
    /// # Arguments
    ///
    /// * `base_dir` - Directory for checkpoint files
    /// * `snapshot_id` - Snapshot identifier
    ///
    /// # Returns
    ///
    /// Path like `{base_dir}/{snapshot_id}.checkpoint.json`
    ///
    /// # Example
    ///
    /// ```rust
    /// use wme_stream::ResumeCheckpoint;
    /// use std::path::PathBuf;
    ///
    /// let path = ResumeCheckpoint::checkpoint_path("/data/checkpoints", "enwiki");
    /// assert_eq!(path, PathBuf::from("/data/checkpoints/enwiki.checkpoint.json"));
    /// ```
    pub fn checkpoint_path(base_dir: impl Into<PathBuf>, snapshot_id: &str) -> PathBuf {
        let mut path = base_dir.into();
        path.push(format!("{}.checkpoint.json", snapshot_id));
        path
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::PathBuf;

    #[test]
    fn test_checkpoint_creation() {
        let checkpoint = ResumeCheckpoint::new("enwiki_namespace_0", "chunk_0", 5000, 1000);

        assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
        assert_eq!(checkpoint.chunk_id, "chunk_0");
        assert_eq!(checkpoint.line_offset, 5000);
        assert_eq!(checkpoint.articles_processed, 1000);
        assert!(checkpoint.timestamp <= Utc::now());
    }

    #[test]
    fn test_checkpoint_path() {
        let path = ResumeCheckpoint::checkpoint_path("/tmp", "enwiki");
        assert_eq!(path, PathBuf::from("/tmp/enwiki.checkpoint.json"));

        let path = ResumeCheckpoint::checkpoint_path(PathBuf::from("/data"), "dewiki_namespace_0");
        assert_eq!(
            path,
            PathBuf::from("/data/dewiki_namespace_0.checkpoint.json")
        );
    }

    #[tokio::test]
    async fn test_checkpoint_save_and_load() {
        let temp_dir = std::env::temp_dir();
        let checkpoint = ResumeCheckpoint::new("test_snapshot", "chunk_0", 100, 50);

        // Save
        let path = temp_dir.join("test.checkpoint.json");
        checkpoint.save(&path).await.unwrap();

        // Load
        let loaded = ResumeCheckpoint::load(&path).await.unwrap();
        assert_eq!(loaded.snapshot_id, checkpoint.snapshot_id);
        assert_eq!(loaded.chunk_id, checkpoint.chunk_id);
        assert_eq!(loaded.line_offset, checkpoint.line_offset);
        assert_eq!(loaded.articles_processed, checkpoint.articles_processed);

        // Cleanup
        tokio::fs::remove_file(&path).await.unwrap();
    }

    #[tokio::test]
    async fn test_checkpoint_load_not_found() {
        let result = ResumeCheckpoint::load("/nonexistent/path.json").await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_checkpoint_load_invalid_json() {
        let temp_dir = std::env::temp_dir();
        let path = temp_dir.join("invalid.checkpoint.json");

        // Write invalid JSON
        tokio::fs::write(&path, "not valid json").await.unwrap();

        let result = ResumeCheckpoint::load(&path).await;
        assert!(result.is_err());

        // Cleanup
        tokio::fs::remove_file(&path).await.unwrap();
    }

    #[test]
    fn test_checkpoint_serialization() {
        let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 5000, 1000);

        let json = serde_json::to_string(&checkpoint).unwrap();
        assert!(json.contains("enwiki"));
        assert!(json.contains("5000"));
        assert!(json.contains("1000"));

        let deserialized: ResumeCheckpoint = serde_json::from_str(&json).unwrap();
        assert_eq!(deserialized.snapshot_id, checkpoint.snapshot_id);
        assert_eq!(deserialized.line_offset, checkpoint.line_offset);
    }

    #[test]
    fn test_checkpoint_clone() {
        let checkpoint = ResumeCheckpoint::new("test", "chunk", 100, 50);
        let cloned = checkpoint.clone();
        assert_eq!(checkpoint, cloned);
    }
}