wme_stream/checkpoint.rs
1//! Checkpoint and resume functionality.
2//!
3//! This module provides save/load functionality for processing checkpoints,
4//! allowing long-running downloads to be resumed after interruptions.
5//!
6//! # Use Cases
7//!
8//! - **Network interruptions** - Resume downloads without starting over
9//! - **Scheduled maintenance** - Pause and resume processing
10//! - **Crash recovery** - Continue from last known position
11//! - **Incremental processing** - Process new data since last checkpoint
12//!
13//! # Checkpoint Data
14//!
15//! Checkpoints store:
16//! - Snapshot identifier
17//! - Chunk identifier (for parallel downloads)
18//! - Line offset within chunk
19//! - Articles processed count
20//! - Timestamp
21//!
22//! # File Format
23//!
24//! Checkpoints are saved as JSON files with extension `.checkpoint.json`:
25//! ```json
26//! {
27//! "snapshot_id": "enwiki_namespace_0",
28//! "chunk_id": "chunk_0",
29//! "line_offset": 5000,
30//! "articles_processed": 1000,
31//! "timestamp": "2024-01-15T10:30:00Z"
32//! }
33//! ```
34//!
35//! # Example: Save and Resume
36//!
37//! ```rust,no_run
38//! use wme_stream::ResumeCheckpoint;
39//!
40//! # async fn example() -> Result<(), wme_stream::StreamError> {
41//! // After processing 1000 articles
42//! let checkpoint = ResumeCheckpoint::new(
43//! "enwiki_namespace_0",
44//! "chunk_0",
45//! 5000, // Last line processed
46//! 1000, // Articles processed
47//! );
48//!
49//! // Save to disk
50//! checkpoint.save("/data/checkpoints/").await?;
51//!
52//! // Later: load and resume
53//! let checkpoint = ResumeCheckpoint::load(
54//! "/data/checkpoints/enwiki_namespace_0.checkpoint.json"
55//! ).await?;
56//!
57//! println!("Resuming from line {}", checkpoint.line_offset);
58//! # Ok(())
59//! # }
60//! ```
61
62use chrono::{DateTime, Utc};
63use serde::{Deserialize, Serialize};
64use std::path::PathBuf;
65
66/// Resume checkpoint for crash recovery.
67///
68/// Stores the current position in a snapshot download, allowing
69/// processing to be resumed after interruptions.
70///
71/// # Example
72///
73/// ```rust
74/// use wme_stream::ResumeCheckpoint;
75///
76/// let checkpoint = ResumeCheckpoint::new(
77/// "enwiki_namespace_0",
78/// "chunk_0",
79/// 5000,
80/// 1000,
81/// );
82///
83/// assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
84/// assert_eq!(checkpoint.line_offset, 5000);
85/// ```
86#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
87pub struct ResumeCheckpoint {
88 /// Snapshot identifier (e.g., "enwiki_namespace_0")
89 pub snapshot_id: String,
90 /// Chunk identifier (e.g., "chunk_0")
91 pub chunk_id: String,
92 /// Line offset within chunk (NDJSON line number)
93 pub line_offset: u64,
94 /// Number of articles processed so far
95 pub articles_processed: u64,
96 /// Checkpoint timestamp
97 pub timestamp: DateTime<Utc>,
98}
99
100impl ResumeCheckpoint {
101 /// Create a new checkpoint with current timestamp.
102 ///
103 /// # Arguments
104 ///
105 /// * `snapshot_id` - The snapshot being processed
106 /// * `chunk_id` - The current chunk
107 /// * `line_offset` - Current line position in NDJSON file
108 /// * `articles_processed` - Total articles processed
109 ///
110 /// # Example
111 ///
112 /// ```rust
113 /// use wme_stream::ResumeCheckpoint;
114 ///
115 /// let checkpoint = ResumeCheckpoint::new(
116 /// "enwiki_namespace_0",
117 /// "chunk_0",
118 /// 5000,
119 /// 1000,
120 /// );
121 /// ```
122 pub fn new(
123 snapshot_id: impl Into<String>,
124 chunk_id: impl Into<String>,
125 line_offset: u64,
126 articles_processed: u64,
127 ) -> Self {
128 Self {
129 snapshot_id: snapshot_id.into(),
130 chunk_id: chunk_id.into(),
131 line_offset,
132 articles_processed,
133 timestamp: Utc::now(),
134 }
135 }
136
137 /// Save checkpoint to disk as JSON.
138 ///
139 /// The file is named `{snapshot_id}.checkpoint.json` and stored
140 /// in the specified directory.
141 ///
142 /// # Arguments
143 ///
144 /// * `path` - Directory to save checkpoint (or full file path)
145 ///
146 /// # Errors
147 ///
148 /// Returns `StreamError::Io` if file write fails.
149 ///
150 /// # Example
151 ///
152 /// ```rust,no_run
153 /// use wme_stream::ResumeCheckpoint;
154 ///
155 /// # async fn example() -> Result<(), wme_stream::StreamError> {
156 /// let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 100, 50);
157 /// checkpoint.save("/data/checkpoints/").await?;
158 /// # Ok(())
159 /// # }
160 /// ```
161 pub async fn save(&self, path: impl Into<PathBuf>) -> Result<(), crate::StreamError> {
162 let path = path.into();
163 let json = serde_json::to_string_pretty(self)
164 .map_err(|e| crate::StreamError::Io(e.to_string()))?;
165 tokio::fs::write(&path, json)
166 .await
167 .map_err(|e| crate::StreamError::Io(e.to_string()))?;
168 Ok(())
169 }
170
171 /// Load checkpoint from disk.
172 ///
173 /// # Arguments
174 ///
175 /// * `path` - Path to checkpoint JSON file
176 ///
177 /// # Errors
178 ///
179 /// Returns `StreamError::Resume` if:
180 /// - File not found
181 /// - Invalid JSON
182 /// - Missing fields
183 ///
184 /// # Example
185 ///
186 /// ```rust,no_run
187 /// use wme_stream::ResumeCheckpoint;
188 ///
189 /// # async fn example() -> Result<(), wme_stream::StreamError> {
190 /// let checkpoint = ResumeCheckpoint::load(
191 /// "/data/checkpoints/enwiki.checkpoint.json"
192 /// ).await?;
193 ///
194 /// println!("Resuming from line {}", checkpoint.line_offset);
195 /// # Ok(())
196 /// # }
197 /// ```
198 pub async fn load(path: impl Into<PathBuf>) -> Result<Self, crate::StreamError> {
199 let path = path.into();
200 let json = tokio::fs::read_to_string(&path)
201 .await
202 .map_err(|e| crate::StreamError::Resume(e.to_string()))?;
203 let checkpoint =
204 serde_json::from_str(&json).map_err(|e| crate::StreamError::Resume(e.to_string()))?;
205 Ok(checkpoint)
206 }
207
208 /// Get checkpoint file path for a snapshot.
209 ///
210 /// Generates a standard checkpoint filename in the specified directory.
211 ///
212 /// # Arguments
213 ///
214 /// * `base_dir` - Directory for checkpoint files
215 /// * `snapshot_id` - Snapshot identifier
216 ///
217 /// # Returns
218 ///
219 /// Path like `{base_dir}/{snapshot_id}.checkpoint.json`
220 ///
221 /// # Example
222 ///
223 /// ```rust
224 /// use wme_stream::ResumeCheckpoint;
225 /// use std::path::PathBuf;
226 ///
227 /// let path = ResumeCheckpoint::checkpoint_path("/data/checkpoints", "enwiki");
228 /// assert_eq!(path, PathBuf::from("/data/checkpoints/enwiki.checkpoint.json"));
229 /// ```
230 pub fn checkpoint_path(base_dir: impl Into<PathBuf>, snapshot_id: &str) -> PathBuf {
231 let mut path = base_dir.into();
232 path.push(format!("{}.checkpoint.json", snapshot_id));
233 path
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use std::path::PathBuf;
241
242 #[test]
243 fn test_checkpoint_creation() {
244 let checkpoint = ResumeCheckpoint::new("enwiki_namespace_0", "chunk_0", 5000, 1000);
245
246 assert_eq!(checkpoint.snapshot_id, "enwiki_namespace_0");
247 assert_eq!(checkpoint.chunk_id, "chunk_0");
248 assert_eq!(checkpoint.line_offset, 5000);
249 assert_eq!(checkpoint.articles_processed, 1000);
250 assert!(checkpoint.timestamp <= Utc::now());
251 }
252
253 #[test]
254 fn test_checkpoint_path() {
255 let path = ResumeCheckpoint::checkpoint_path("/tmp", "enwiki");
256 assert_eq!(path, PathBuf::from("/tmp/enwiki.checkpoint.json"));
257
258 let path = ResumeCheckpoint::checkpoint_path(PathBuf::from("/data"), "dewiki_namespace_0");
259 assert_eq!(
260 path,
261 PathBuf::from("/data/dewiki_namespace_0.checkpoint.json")
262 );
263 }
264
265 #[tokio::test]
266 async fn test_checkpoint_save_and_load() {
267 let temp_dir = std::env::temp_dir();
268 let checkpoint = ResumeCheckpoint::new("test_snapshot", "chunk_0", 100, 50);
269
270 // Save
271 let path = temp_dir.join("test.checkpoint.json");
272 checkpoint.save(&path).await.unwrap();
273
274 // Load
275 let loaded = ResumeCheckpoint::load(&path).await.unwrap();
276 assert_eq!(loaded.snapshot_id, checkpoint.snapshot_id);
277 assert_eq!(loaded.chunk_id, checkpoint.chunk_id);
278 assert_eq!(loaded.line_offset, checkpoint.line_offset);
279 assert_eq!(loaded.articles_processed, checkpoint.articles_processed);
280
281 // Cleanup
282 tokio::fs::remove_file(&path).await.unwrap();
283 }
284
285 #[tokio::test]
286 async fn test_checkpoint_load_not_found() {
287 let result = ResumeCheckpoint::load("/nonexistent/path.json").await;
288 assert!(result.is_err());
289 }
290
291 #[tokio::test]
292 async fn test_checkpoint_load_invalid_json() {
293 let temp_dir = std::env::temp_dir();
294 let path = temp_dir.join("invalid.checkpoint.json");
295
296 // Write invalid JSON
297 tokio::fs::write(&path, "not valid json").await.unwrap();
298
299 let result = ResumeCheckpoint::load(&path).await;
300 assert!(result.is_err());
301
302 // Cleanup
303 tokio::fs::remove_file(&path).await.unwrap();
304 }
305
306 #[test]
307 fn test_checkpoint_serialization() {
308 let checkpoint = ResumeCheckpoint::new("enwiki", "chunk_0", 5000, 1000);
309
310 let json = serde_json::to_string(&checkpoint).unwrap();
311 assert!(json.contains("enwiki"));
312 assert!(json.contains("5000"));
313 assert!(json.contains("1000"));
314
315 let deserialized: ResumeCheckpoint = serde_json::from_str(&json).unwrap();
316 assert_eq!(deserialized.snapshot_id, checkpoint.snapshot_id);
317 assert_eq!(deserialized.line_offset, checkpoint.line_offset);
318 }
319
320 #[test]
321 fn test_checkpoint_clone() {
322 let checkpoint = ResumeCheckpoint::new("test", "chunk", 100, 50);
323 let cloned = checkpoint.clone();
324 assert_eq!(checkpoint, cloned);
325 }
326}