Skip to main content

raft_hpc_core/
backup.rs

1//! Backup export, verify, and restore for Raft state.
2//!
3//! Backup format: tar.gz containing:
4//! ```text
5//! backup-{timestamp}/
6//!   metadata.json     — backup metadata (timestamp, term, index, app-specific)
7//!   snapshot.json     — application state serialized as JSON
8//! ```
9//!
10//! The backup functions are generic over the application state type `S` and
11//! metadata type `M`, allowing each application to store its own metadata.
12
13use std::io::{self, Read};
14use std::path::Path;
15use std::sync::Arc;
16
17use chrono::{DateTime, Utc};
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use tokio::sync::RwLock;
21use tracing::debug;
22
23use openraft::{RaftTypeConfig, StoredMembership};
24
25use crate::BackupMetadataSource;
26
27/// Core backup metadata, present in every backup regardless of application.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BackupMetadata<M> {
30    pub timestamp: DateTime<Utc>,
31    pub snapshot_term: u64,
32    pub snapshot_index: u64,
33    /// Application-specific metadata (e.g., node count, entry count).
34    pub app: M,
35}
36
37/// Export the current state to a tar.gz backup at the given path.
38pub async fn export_backup<S>(
39    state: &Arc<RwLock<S>>,
40    path: &Path,
41) -> io::Result<BackupMetadata<S::Metadata>>
42where
43    S: Serialize + Send + Sync + BackupMetadataSource,
44{
45    let state_guard = state.read().await;
46    let state_json = serde_json::to_vec_pretty(&*state_guard).map_err(io::Error::other)?;
47    let app_metadata = state_guard.backup_metadata();
48    drop(state_guard);
49
50    let metadata = BackupMetadata {
51        timestamp: Utc::now(),
52        snapshot_term: 0,
53        snapshot_index: 0,
54        app: app_metadata,
55    };
56    let metadata_json = serde_json::to_vec_pretty(&metadata).map_err(io::Error::other)?;
57
58    let prefix = format!("backup-{}", metadata.timestamp.format("%Y%m%dT%H%M%SZ"));
59
60    // Create tar.gz
61    let file = std::fs::File::create(path)?;
62    let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
63    let mut tar = tar::Builder::new(enc);
64
65    // Add metadata.json
66    let mut header = tar::Header::new_gnu();
67    header.set_size(metadata_json.len() as u64);
68    header.set_mode(0o644);
69    header.set_cksum();
70    tar.append_data(
71        &mut header,
72        format!("{prefix}/metadata.json"),
73        metadata_json.as_slice(),
74    )?;
75
76    // Add snapshot.json
77    let mut header = tar::Header::new_gnu();
78    header.set_size(state_json.len() as u64);
79    header.set_mode(0o644);
80    header.set_cksum();
81    tar.append_data(
82        &mut header,
83        format!("{prefix}/snapshot.json"),
84        state_json.as_slice(),
85    )?;
86
87    tar.into_inner()?.finish()?;
88
89    debug!("Exported backup to {}", path.display());
90    Ok(metadata)
91}
92
93/// Verify a backup file's integrity and return its metadata.
94pub fn verify_backup<S, M>(path: &Path) -> io::Result<BackupMetadata<M>>
95where
96    S: DeserializeOwned,
97    M: DeserializeOwned + Serialize,
98{
99    let file = std::fs::File::open(path)?;
100    let dec = flate2::read::GzDecoder::new(file);
101    let mut archive = tar::Archive::new(dec);
102
103    let mut found_metadata = false;
104    let mut found_snapshot = false;
105    let mut metadata: Option<BackupMetadata<M>> = None;
106
107    for entry in archive.entries()? {
108        let mut entry = entry?;
109        let path = entry.path()?.to_path_buf();
110        let name = path
111            .file_name()
112            .map(|n| n.to_string_lossy().to_string())
113            .unwrap_or_default();
114
115        match name.as_str() {
116            "metadata.json" => {
117                let mut buf = Vec::new();
118                entry.read_to_end(&mut buf)?;
119                metadata = Some(
120                    serde_json::from_slice(&buf)
121                        .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
122                );
123                found_metadata = true;
124            }
125            "snapshot.json" => {
126                let mut buf = Vec::new();
127                entry.read_to_end(&mut buf)?;
128                let _state: S = serde_json::from_slice(&buf)
129                    .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
130                found_snapshot = true;
131            }
132            _ => {}
133        }
134    }
135
136    if !found_metadata {
137        return Err(io::Error::new(
138            io::ErrorKind::InvalidData,
139            "backup missing metadata.json",
140        ));
141    }
142    if !found_snapshot {
143        return Err(io::Error::new(
144            io::ErrorKind::InvalidData,
145            "backup missing snapshot.json",
146        ));
147    }
148
149    Ok(metadata.unwrap())
150}
151
152/// Restore a backup into the given `data_dir` for Raft to load on restart.
153///
154/// Extracts the snapshot from the backup and places it in the snapshots
155/// directory so the state machine will load it on next startup. The snapshot
156/// is written in `PersistedSnapshot { meta, state }` format that
157/// `load_latest_snapshot` expects.
158pub fn restore_backup<C, S, M>(backup_path: &Path, data_dir: &Path) -> io::Result<BackupMetadata<M>>
159where
160    C: RaftTypeConfig,
161    S: Serialize + DeserializeOwned,
162    M: Serialize + DeserializeOwned,
163    StoredMembership<C>: Serialize + Default,
164{
165    // First verify the backup
166    let metadata = verify_backup::<S, M>(backup_path)?;
167
168    let snapshot_dir = data_dir.join("raft").join("snapshots");
169    std::fs::create_dir_all(&snapshot_dir)?;
170
171    // Extract the snapshot data
172    let file = std::fs::File::open(backup_path)?;
173    let dec = flate2::read::GzDecoder::new(file);
174    let mut archive = tar::Archive::new(dec);
175
176    for entry in archive.entries()? {
177        let mut entry = entry?;
178        let path = entry.path()?.to_path_buf();
179        let name = path
180            .file_name()
181            .map(|n| n.to_string_lossy().to_string())
182            .unwrap_or_default();
183
184        if name == "snapshot.json" {
185            let mut state_data = Vec::new();
186            entry.read_to_end(&mut state_data)?;
187
188            // Validate it's parseable
189            let state: S = serde_json::from_slice(&state_data)
190                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
191
192            let snap_filename = format!(
193                "snap-{}-{}.json",
194                metadata.snapshot_term, metadata.snapshot_index
195            );
196            let snap_path = snapshot_dir.join(&snap_filename);
197
198            // Write in PersistedSnapshot { meta, state } format that
199            // load_latest_snapshot expects.
200            let persisted = serde_json::json!({
201                "meta": {
202                    "last_log_id": null,
203                    "last_membership": StoredMembership::<C>::default(),
204                    "snapshot_id": format!(
205                        "restored-{}",
206                        metadata.timestamp.format("%Y%m%dT%H%M%SZ")
207                    ),
208                },
209                "state": state,
210            });
211            let json = serde_json::to_vec_pretty(&persisted).map_err(io::Error::other)?;
212            std::fs::write(&snap_path, &json)?;
213
214            // Update "current" pointer
215            let current = snapshot_dir.join("current");
216            std::fs::write(&current, snap_filename.as_bytes())?;
217
218            debug!("Restored backup to {}", snap_path.display());
219            break;
220        }
221    }
222
223    // Clean up WAL since we're restoring from a snapshot
224    let wal_dir = data_dir.join("raft").join("wal");
225    if wal_dir.exists() {
226        for entry in std::fs::read_dir(&wal_dir)? {
227            let entry = entry?;
228            let _ = std::fs::remove_file(entry.path());
229        }
230    }
231
232    // Clean up vote/committed files
233    let vote_path = data_dir.join("raft").join("vote.json");
234    let committed_path = data_dir.join("raft").join("committed.json");
235    let _ = std::fs::remove_file(&vote_path);
236    let _ = std::fs::remove_file(&committed_path);
237
238    Ok(metadata)
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::test_types::TestTypeConfig;
245
246    #[derive(Debug, Clone, Default, Serialize, Deserialize)]
247    struct TestState {
248        items: Vec<String>,
249    }
250
251    #[derive(Debug, Clone, Serialize, Deserialize)]
252    struct TestMetadata {
253        item_count: usize,
254    }
255
256    impl crate::StateMachineState<TestTypeConfig> for TestState {
257        fn apply(
258            &mut self,
259            _cmd: crate::test_types::TestCommand,
260        ) -> crate::test_types::TestResponse {
261            crate::test_types::TestResponse::Ok
262        }
263
264        fn blank_response() -> crate::test_types::TestResponse {
265            crate::test_types::TestResponse::Ok
266        }
267    }
268
269    impl BackupMetadataSource for TestState {
270        type Metadata = TestMetadata;
271
272        fn backup_metadata(&self) -> TestMetadata {
273            TestMetadata {
274                item_count: self.items.len(),
275            }
276        }
277    }
278
279    fn test_state() -> Arc<RwLock<TestState>> {
280        Arc::new(RwLock::new(TestState {
281            items: vec!["one".into(), "two".into(), "three".into()],
282        }))
283    }
284
285    #[tokio::test]
286    async fn export_and_verify_roundtrip() {
287        let state = test_state();
288        let dir = tempfile::tempdir().unwrap();
289        let backup_path = dir.path().join("test-backup.tar.gz");
290
291        let export_meta = export_backup(&state, &backup_path).await.unwrap();
292        assert_eq!(export_meta.app.item_count, 3);
293
294        let verify_meta = verify_backup::<TestState, TestMetadata>(&backup_path).unwrap();
295        assert_eq!(verify_meta.app.item_count, 3);
296    }
297
298    #[tokio::test]
299    async fn verify_corrupt_backup_fails() {
300        let dir = tempfile::tempdir().unwrap();
301        let backup_path = dir.path().join("corrupt.tar.gz");
302        std::fs::write(&backup_path, b"not a valid tar.gz").unwrap();
303
304        let result = verify_backup::<TestState, TestMetadata>(&backup_path);
305        assert!(result.is_err());
306    }
307
308    #[tokio::test]
309    async fn verify_missing_snapshot_fails() {
310        let dir = tempfile::tempdir().unwrap();
311        let backup_path = dir.path().join("incomplete.tar.gz");
312
313        // Create a tar.gz with only metadata
314        let file = std::fs::File::create(&backup_path).unwrap();
315        let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
316        let mut tar_builder = tar::Builder::new(enc);
317
318        let metadata = BackupMetadata {
319            timestamp: Utc::now(),
320            snapshot_term: 0,
321            snapshot_index: 0,
322            app: TestMetadata { item_count: 0 },
323        };
324        let metadata_json = serde_json::to_vec(&metadata).unwrap();
325
326        let mut header = tar::Header::new_gnu();
327        header.set_size(metadata_json.len() as u64);
328        header.set_mode(0o644);
329        header.set_cksum();
330        tar_builder
331            .append_data(
332                &mut header,
333                "backup/metadata.json",
334                metadata_json.as_slice(),
335            )
336            .unwrap();
337
338        tar_builder.into_inner().unwrap().finish().unwrap();
339
340        let result = verify_backup::<TestState, TestMetadata>(&backup_path);
341        assert!(result.is_err());
342        assert!(
343            result
344                .unwrap_err()
345                .to_string()
346                .contains("missing snapshot.json")
347        );
348    }
349
350    #[tokio::test]
351    async fn restore_writes_snapshot_files() {
352        let state = test_state();
353        let dir = tempfile::tempdir().unwrap();
354        let backup_path = dir.path().join("backup.tar.gz");
355        let data_dir = dir.path().join("restored");
356
357        export_backup(&state, &backup_path).await.unwrap();
358        let meta =
359            restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir)
360                .unwrap();
361
362        assert_eq!(meta.app.item_count, 3);
363
364        // Verify snapshot file was created
365        let snap_dir = data_dir.join("raft").join("snapshots");
366        assert!(snap_dir.exists());
367        assert!(snap_dir.join("current").exists());
368
369        // Verify the current pointer points to a valid file
370        let current = std::fs::read_to_string(snap_dir.join("current")).unwrap();
371        assert!(snap_dir.join(current.trim()).exists());
372    }
373
374    #[test]
375    fn verify_nonexistent_backup_fails() {
376        let result =
377            verify_backup::<TestState, TestMetadata>(Path::new("/nonexistent/backup.tar.gz"));
378        assert!(result.is_err());
379    }
380
381    #[tokio::test]
382    async fn restore_cleans_up_wal_and_vote() {
383        let state = test_state();
384        let dir = tempfile::tempdir().unwrap();
385        let backup_path = dir.path().join("backup.tar.gz");
386        let data_dir = dir.path().join("restored");
387
388        export_backup(&state, &backup_path).await.unwrap();
389
390        // Create existing WAL and vote files to verify cleanup
391        let wal_dir = data_dir.join("raft").join("wal");
392        std::fs::create_dir_all(&wal_dir).unwrap();
393        std::fs::write(wal_dir.join("1.json"), b"old entry").unwrap();
394        std::fs::write(wal_dir.join("2.json"), b"old entry").unwrap();
395
396        let raft_dir = data_dir.join("raft");
397        std::fs::write(raft_dir.join("vote.json"), b"old vote").unwrap();
398        std::fs::write(raft_dir.join("committed.json"), b"old committed").unwrap();
399
400        restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir).unwrap();
401
402        // WAL entries should be cleaned up
403        assert!(!wal_dir.join("1.json").exists());
404        assert!(!wal_dir.join("2.json").exists());
405        // vote and committed should be cleaned up
406        assert!(!raft_dir.join("vote.json").exists());
407        assert!(!raft_dir.join("committed.json").exists());
408    }
409
410    #[tokio::test]
411    async fn verify_ignores_unknown_entries() {
412        let state = test_state();
413        let dir = tempfile::tempdir().unwrap();
414        let backup_path = dir.path().join("extra-files.tar.gz");
415
416        // Create a tar.gz with metadata, snapshot, AND an extra unknown file
417        let file = std::fs::File::create(&backup_path).unwrap();
418        let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
419        let mut tar_builder = tar::Builder::new(enc);
420
421        let state_guard = state.read().await;
422        let state_json = serde_json::to_vec_pretty(&*state_guard).unwrap();
423        let app_metadata = state_guard.backup_metadata();
424        drop(state_guard);
425
426        let metadata = BackupMetadata {
427            timestamp: Utc::now(),
428            snapshot_term: 0,
429            snapshot_index: 0,
430            app: app_metadata,
431        };
432        let metadata_json = serde_json::to_vec(&metadata).unwrap();
433
434        // metadata.json
435        let mut header = tar::Header::new_gnu();
436        header.set_size(metadata_json.len() as u64);
437        header.set_mode(0o644);
438        header.set_cksum();
439        tar_builder
440            .append_data(
441                &mut header,
442                "backup/metadata.json",
443                metadata_json.as_slice(),
444            )
445            .unwrap();
446
447        // unknown extra file
448        let extra = b"some extra data";
449        let mut header = tar::Header::new_gnu();
450        header.set_size(extra.len() as u64);
451        header.set_mode(0o644);
452        header.set_cksum();
453        tar_builder
454            .append_data(&mut header, "backup/extra-info.txt", extra.as_slice())
455            .unwrap();
456
457        // snapshot.json
458        let mut header = tar::Header::new_gnu();
459        header.set_size(state_json.len() as u64);
460        header.set_mode(0o644);
461        header.set_cksum();
462        tar_builder
463            .append_data(&mut header, "backup/snapshot.json", state_json.as_slice())
464            .unwrap();
465
466        tar_builder.into_inner().unwrap().finish().unwrap();
467
468        // verify should succeed (extra file is ignored)
469        let result = verify_backup::<TestState, TestMetadata>(&backup_path).unwrap();
470        assert_eq!(result.app.item_count, 3);
471    }
472
473    #[tokio::test]
474    async fn restore_snapshot_loadable_by_state_machine() {
475        use crate::state_machine::HpcStateMachine;
476
477        let state = test_state();
478        let dir = tempfile::tempdir().unwrap();
479        let backup_path = dir.path().join("backup.tar.gz");
480        let data_dir = dir.path().join("restored");
481
482        export_backup(&state, &backup_path).await.unwrap();
483        restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir).unwrap();
484
485        // The state machine should be able to load the restored snapshot
486        let snap_dir = data_dir.join("raft").join("snapshots");
487        let fresh_state = tokio::task::spawn_blocking(move || {
488            let fresh_state = Arc::new(tokio::sync::RwLock::new(TestState { items: vec![] }));
489            let _sm = HpcStateMachine::<TestTypeConfig, TestState>::with_snapshot_dir(
490                fresh_state.clone(),
491                snap_dir,
492            )
493            .unwrap();
494            fresh_state
495        })
496        .await
497        .unwrap();
498
499        let s = fresh_state.read().await;
500        assert_eq!(s.items.len(), 3);
501        assert_eq!(s.items[0], "one");
502    }
503
504    #[tokio::test]
505    async fn verify_missing_metadata_fails() {
506        let dir = tempfile::tempdir().unwrap();
507        let backup_path = dir.path().join("no-metadata.tar.gz");
508
509        // Create tar.gz with only snapshot
510        let file = std::fs::File::create(&backup_path).unwrap();
511        let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
512        let mut tar_builder = tar::Builder::new(enc);
513
514        let state = TestState::default();
515        let snapshot_json = serde_json::to_vec(&state).unwrap();
516
517        let mut header = tar::Header::new_gnu();
518        header.set_size(snapshot_json.len() as u64);
519        header.set_mode(0o644);
520        header.set_cksum();
521        tar_builder
522            .append_data(
523                &mut header,
524                "backup/snapshot.json",
525                snapshot_json.as_slice(),
526            )
527            .unwrap();
528
529        tar_builder.into_inner().unwrap().finish().unwrap();
530
531        let result = verify_backup::<TestState, TestMetadata>(&backup_path);
532        assert!(result.is_err());
533        assert!(
534            result
535                .unwrap_err()
536                .to_string()
537                .contains("missing metadata.json")
538        );
539    }
540}