Skip to main content

wme_stream/
checkpoint.rs

1//! Checkpoint and resume functionality.
2//!
3//! This module provides save/load functionality for processing checkpoints,
4//! allowing long-running downloads to be resumed after interruptions.
5//!
6//! # Use Cases
7//!
8//! - **Network interruptions** - Resume downloads without starting over
9//! - **Scheduled maintenance** - Pause and resume processing
10//! - **Crash recovery** - Continue from last known position
11//! - **Incremental processing** - Process new data since last checkpoint
12//!
13//! # Checkpoint Data
14//!
15//! Checkpoints store:
16//! - Snapshot identifier
17//! - Chunk identifier (for parallel downloads)
18//! - Line offset within chunk
19//! - Articles processed count
20//! - Timestamp
21//!
22//! # File Format
23//!
24//! Checkpoints are saved as JSON files with extension `.checkpoint.json`:
25//! ```json
26//! {
27//!   "snapshot_id": "enwiki_namespace_0",
28//!   "chunk_id": "chunk_0",
29//!   "line_offset": 5000,
30//!   "articles_processed": 1000,
31//!   "timestamp": "2024-01-15T10:30:00Z"
32//! }
33//! ```
34//!
35//! # Example: Save and Resume
36//!
37//! ```rust,no_run
38//! use wme_stream::ResumeCheckpoint;
39//!
40//! # async fn example() -> Result<(), wme_stream::StreamError> {
41//! // After processing 1000 articles
42//! let checkpoint = ResumeCheckpoint::new(
43//!     "enwiki_namespace_0",
44//!     "chunk_0",
45//!     5000,  // Last line processed
46//!     1000,  // Articles processed
47//! );
48//!
49//! // Save to disk
50//! checkpoint.save("/data/checkpoints/").await?;
51//!
52//! // Later: load and resume
53//! let checkpoint = ResumeCheckpoint::load(
54//!     "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
55//! ).await?;
56//!
57//! println!("Resuming from line {}", checkpoint.line_offset);
58//! # Ok(())
59//! # }
60//! ```
61
62use chrono::{DateTime, Utc};
63use serde::{Deserialize, Serialize};
64use std::path::PathBuf;
65
66/// Resume checkpoint for crash recovery.
67///
68/// Stores the current position in a snapshot download, allowing
69/// processing to be resumed after interruptions.
70///
71/// # Example
72///
73/// ```rust
74/// use wme_stream::ResumeCheckpoint;
75///
76/// let checkpoint = ResumeCheckpoint::new(
77///     "enwiki_namespace_0",
78///     "chunk_0",
79///     5000,
80///     1000,
81/// );
82///
83/// assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
84/// assert_eq!(checkpoint.line_offset, 5000);
85/// ```
86#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
87pub struct ResumeCheckpoint {
88    /// Snapshot identifier (e.g., "enwiki_namespace_0")
89    pub snapshot_id: String,
90    /// Chunk identifier (e.g., "chunk_0")
91    pub chunk_id: String,
92    /// Line offset within chunk (NDJSON line number)
93    pub line_offset: u64,
94    /// Number of articles processed so far
95    pub articles_processed: u64,
96    /// Checkpoint timestamp
97    pub timestamp: DateTime<Utc>,
98}
99
100impl ResumeCheckpoint {
101    /// Create a new checkpoint with current timestamp.
102    ///
103    /// # Arguments
104    ///
105    /// * `snapshot_id` - The snapshot being processed
106    /// * `chunk_id` - The current chunk
107    /// * `line_offset` - Current line position in NDJSON file
108    /// * `articles_processed` - Total articles processed
109    ///
110    /// # Example
111    ///
112    /// ```rust
113    /// use wme_stream::ResumeCheckpoint;
114    ///
115    /// let checkpoint = ResumeCheckpoint::new(
116    ///     "enwiki_namespace_0",
117    ///     "chunk_0",
118    ///     5000,
119    ///     1000,
120    /// );
121    /// ```
122    pub fn new(
123        snapshot_id: impl Into<String>,
124        chunk_id: impl Into<String>,
125        line_offset: u64,
126        articles_processed: u64,
127    ) -> Self {
128        Self {
129            snapshot_id: snapshot_id.into(),
130            chunk_id: chunk_id.into(),
131            line_offset,
132            articles_processed,
133            timestamp: Utc::now(),
134        }
135    }
136
137    /// Save checkpoint to disk as JSON.
138    ///
139    /// The file is named `{snapshot_id}.checkpoint.json` and stored
140    /// in the specified directory.
141    ///
142    /// # Arguments
143    ///
144    /// * `path` - Directory to save checkpoint (or full file path)
145    ///
146    /// # Errors
147    ///
148    /// Returns `StreamError::Io` if file write fails.
149    ///
150    /// # Example
151    ///
152    /// ```rust,no_run
153    /// use wme_stream::ResumeCheckpoint;
154    ///
155    /// # async fn example() -> Result<(), wme_stream::StreamError> {
156    /// let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 100, 50);
157    /// checkpoint.save("/data/checkpoints/").await?;
158    /// # Ok(())
159    /// # }
160    /// ```
161    pub async fn save(&self, path: impl Into<PathBuf>) -> Result<(), crate::StreamError> {
162        let path = path.into();
163        let json = serde_json::to_string_pretty(self)
164            .map_err(|e| crate::StreamError::Io(e.to_string()))?;
165        tokio::fs::write(&path, json)
166            .await
167            .map_err(|e| crate::StreamError::Io(e.to_string()))?;
168        Ok(())
169    }
170
171    /// Load checkpoint from disk.
172    ///
173    /// # Arguments
174    ///
175    /// * `path` - Path to checkpoint JSON file
176    ///
177    /// # Errors
178    ///
179    /// Returns `StreamError::Resume` if:
180    /// - File not found
181    /// - Invalid JSON
182    /// - Missing fields
183    ///
184    /// # Example
185    ///
186    /// ```rust,no_run
187    /// use wme_stream::ResumeCheckpoint;
188    ///
189    /// # async fn example() -> Result<(), wme_stream::StreamError> {
190    /// let checkpoint = ResumeCheckpoint::load(
191    ///     "/data/checkpoints/enwiki.checkpoint.json"
192    /// ).await?;
193    ///
194    /// println!("Resuming from line {}", checkpoint.line_offset);
195    /// # Ok(())
196    /// # }
197    /// ```
198    pub async fn load(path: impl Into<PathBuf>) -> Result<Self, crate::StreamError> {
199        let path = path.into();
200        let json = tokio::fs::read_to_string(&path)
201            .await
202            .map_err(|e| crate::StreamError::Resume(e.to_string()))?;
203        let checkpoint =
204            serde_json::from_str(&json).map_err(|e| crate::StreamError::Resume(e.to_string()))?;
205        Ok(checkpoint)
206    }
207
208    /// Get checkpoint file path for a snapshot.
209    ///
210    /// Generates a standard checkpoint filename in the specified directory.
211    ///
212    /// # Arguments
213    ///
214    /// * `base_dir` - Directory for checkpoint files
215    /// * `snapshot_id` - Snapshot identifier
216    ///
217    /// # Returns
218    ///
219    /// Path like `{base_dir}/{snapshot_id}.checkpoint.json`
220    ///
221    /// # Example
222    ///
223    /// ```rust
224    /// use wme_stream::ResumeCheckpoint;
225    /// use std::path::PathBuf;
226    ///
227    /// let path = ResumeCheckpoint::checkpoint_path("/data/checkpoints", "enwiki");
228    /// assert_eq!(path, PathBuf::from("/data/checkpoints/enwiki.checkpoint.json"));
229    /// ```
230    pub fn checkpoint_path(base_dir: impl Into<PathBuf>, snapshot_id: &str) -> PathBuf {
231        let mut path = base_dir.into();
232        path.push(format!("{}.checkpoint.json", snapshot_id));
233        path
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use std::path::PathBuf;
241
242    #[test]
243    fn test_checkpoint_creation() {
244        let checkpoint = ResumeCheckpoint::new("enwiki_namespace_0", "chunk_0", 5000, 1000);
245
246        assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
247        assert_eq!(checkpoint.chunk_id, "chunk_0");
248        assert_eq!(checkpoint.line_offset, 5000);
249        assert_eq!(checkpoint.articles_processed, 1000);
250        assert!(checkpoint.timestamp <= Utc::now());
251    }
252
253    #[test]
254    fn test_checkpoint_path() {
255        let path = ResumeCheckpoint::checkpoint_path("/tmp", "enwiki");
256        assert_eq!(path, PathBuf::from("/tmp/enwiki.checkpoint.json"));
257
258        let path = ResumeCheckpoint::checkpoint_path(PathBuf::from("/data"), "dewiki_namespace_0");
259        assert_eq!(
260            path,
261            PathBuf::from("/data/dewiki_namespace_0.checkpoint.json")
262        );
263    }
264
265    #[tokio::test]
266    async fn test_checkpoint_save_and_load() {
267        let temp_dir = std::env::temp_dir();
268        let checkpoint = ResumeCheckpoint::new("test_snapshot", "chunk_0", 100, 50);
269
270        // Save
271        let path = temp_dir.join("test.checkpoint.json");
272        checkpoint.save(&path).await.unwrap();
273
274        // Load
275        let loaded = ResumeCheckpoint::load(&path).await.unwrap();
276        assert_eq!(loaded.snapshot_id, checkpoint.snapshot_id);
277        assert_eq!(loaded.chunk_id, checkpoint.chunk_id);
278        assert_eq!(loaded.line_offset, checkpoint.line_offset);
279        assert_eq!(loaded.articles_processed, checkpoint.articles_processed);
280
281        // Cleanup
282        tokio::fs::remove_file(&path).await.unwrap();
283    }
284
285    #[tokio::test]
286    async fn test_checkpoint_load_not_found() {
287        let result = ResumeCheckpoint::load("/nonexistent/path.json").await;
288        assert!(result.is_err());
289    }
290
291    #[tokio::test]
292    async fn test_checkpoint_load_invalid_json() {
293        let temp_dir = std::env::temp_dir();
294        let path = temp_dir.join("invalid.checkpoint.json");
295
296        // Write invalid JSON
297        tokio::fs::write(&path, "not valid json").await.unwrap();
298
299        let result = ResumeCheckpoint::load(&path).await;
300        assert!(result.is_err());
301
302        // Cleanup
303        tokio::fs::remove_file(&path).await.unwrap();
304    }
305
306    #[test]
307    fn test_checkpoint_serialization() {
308        let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 5000, 1000);
309
310        let json = serde_json::to_string(&checkpoint).unwrap();
311        assert!(json.contains("enwiki"));
312        assert!(json.contains("5000"));
313        assert!(json.contains("1000"));
314
315        let deserialized: ResumeCheckpoint = serde_json::from_str(&json).unwrap();
316        assert_eq!(deserialized.snapshot_id, checkpoint.snapshot_id);
317        assert_eq!(deserialized.line_offset, checkpoint.line_offset);
318    }
319
320    #[test]
321    fn test_checkpoint_clone() {
322        let checkpoint = ResumeCheckpoint::new("test", "chunk", 100, 50);
323        let cloned = checkpoint.clone();
324        assert_eq!(checkpoint, cloned);
325    }
326}