1use std::io::{self, Read};
14use std::path::Path;
15use std::sync::Arc;
16
17use chrono::{DateTime, Utc};
18use serde::de::DeserializeOwned;
19use serde::{Deserialize, Serialize};
20use tokio::sync::RwLock;
21use tracing::debug;
22
23use openraft::{RaftTypeConfig, StoredMembership};
24
25use crate::BackupMetadataSource;
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct BackupMetadata<M> {
30 pub timestamp: DateTime<Utc>,
31 pub snapshot_term: u64,
32 pub snapshot_index: u64,
33 pub app: M,
35}
36
37pub async fn export_backup<S>(
39 state: &Arc<RwLock<S>>,
40 path: &Path,
41) -> io::Result<BackupMetadata<S::Metadata>>
42where
43 S: Serialize + Send + Sync + BackupMetadataSource,
44{
45 let state_guard = state.read().await;
46 let state_json = serde_json::to_vec_pretty(&*state_guard).map_err(io::Error::other)?;
47 let app_metadata = state_guard.backup_metadata();
48 drop(state_guard);
49
50 let metadata = BackupMetadata {
51 timestamp: Utc::now(),
52 snapshot_term: 0,
53 snapshot_index: 0,
54 app: app_metadata,
55 };
56 let metadata_json = serde_json::to_vec_pretty(&metadata).map_err(io::Error::other)?;
57
58 let prefix = format!("backup-{}", metadata.timestamp.format("%Y%m%dT%H%M%SZ"));
59
60 let file = std::fs::File::create(path)?;
62 let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
63 let mut tar = tar::Builder::new(enc);
64
65 let mut header = tar::Header::new_gnu();
67 header.set_size(metadata_json.len() as u64);
68 header.set_mode(0o644);
69 header.set_cksum();
70 tar.append_data(
71 &mut header,
72 format!("{prefix}/metadata.json"),
73 metadata_json.as_slice(),
74 )?;
75
76 let mut header = tar::Header::new_gnu();
78 header.set_size(state_json.len() as u64);
79 header.set_mode(0o644);
80 header.set_cksum();
81 tar.append_data(
82 &mut header,
83 format!("{prefix}/snapshot.json"),
84 state_json.as_slice(),
85 )?;
86
87 tar.into_inner()?.finish()?;
88
89 debug!("Exported backup to {}", path.display());
90 Ok(metadata)
91}
92
93pub fn verify_backup<S, M>(path: &Path) -> io::Result<BackupMetadata<M>>
95where
96 S: DeserializeOwned,
97 M: DeserializeOwned + Serialize,
98{
99 let file = std::fs::File::open(path)?;
100 let dec = flate2::read::GzDecoder::new(file);
101 let mut archive = tar::Archive::new(dec);
102
103 let mut found_metadata = false;
104 let mut found_snapshot = false;
105 let mut metadata: Option<BackupMetadata<M>> = None;
106
107 for entry in archive.entries()? {
108 let mut entry = entry?;
109 let path = entry.path()?.to_path_buf();
110 let name = path
111 .file_name()
112 .map(|n| n.to_string_lossy().to_string())
113 .unwrap_or_default();
114
115 match name.as_str() {
116 "metadata.json" => {
117 let mut buf = Vec::new();
118 entry.read_to_end(&mut buf)?;
119 metadata = Some(
120 serde_json::from_slice(&buf)
121 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
122 );
123 found_metadata = true;
124 }
125 "snapshot.json" => {
126 let mut buf = Vec::new();
127 entry.read_to_end(&mut buf)?;
128 let _state: S = serde_json::from_slice(&buf)
129 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
130 found_snapshot = true;
131 }
132 _ => {}
133 }
134 }
135
136 if !found_metadata {
137 return Err(io::Error::new(
138 io::ErrorKind::InvalidData,
139 "backup missing metadata.json",
140 ));
141 }
142 if !found_snapshot {
143 return Err(io::Error::new(
144 io::ErrorKind::InvalidData,
145 "backup missing snapshot.json",
146 ));
147 }
148
149 Ok(metadata.unwrap())
150}
151
152pub fn restore_backup<C, S, M>(backup_path: &Path, data_dir: &Path) -> io::Result<BackupMetadata<M>>
159where
160 C: RaftTypeConfig,
161 S: Serialize + DeserializeOwned,
162 M: Serialize + DeserializeOwned,
163 StoredMembership<C>: Serialize + Default,
164{
165 let metadata = verify_backup::<S, M>(backup_path)?;
167
168 let snapshot_dir = data_dir.join("raft").join("snapshots");
169 std::fs::create_dir_all(&snapshot_dir)?;
170
171 let file = std::fs::File::open(backup_path)?;
173 let dec = flate2::read::GzDecoder::new(file);
174 let mut archive = tar::Archive::new(dec);
175
176 for entry in archive.entries()? {
177 let mut entry = entry?;
178 let path = entry.path()?.to_path_buf();
179 let name = path
180 .file_name()
181 .map(|n| n.to_string_lossy().to_string())
182 .unwrap_or_default();
183
184 if name == "snapshot.json" {
185 let mut state_data = Vec::new();
186 entry.read_to_end(&mut state_data)?;
187
188 let state: S = serde_json::from_slice(&state_data)
190 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
191
192 let snap_filename = format!(
193 "snap-{}-{}.json",
194 metadata.snapshot_term, metadata.snapshot_index
195 );
196 let snap_path = snapshot_dir.join(&snap_filename);
197
198 let persisted = serde_json::json!({
201 "meta": {
202 "last_log_id": null,
203 "last_membership": StoredMembership::<C>::default(),
204 "snapshot_id": format!(
205 "restored-{}",
206 metadata.timestamp.format("%Y%m%dT%H%M%SZ")
207 ),
208 },
209 "state": state,
210 });
211 let json = serde_json::to_vec_pretty(&persisted).map_err(io::Error::other)?;
212 std::fs::write(&snap_path, &json)?;
213
214 let current = snapshot_dir.join("current");
216 std::fs::write(¤t, snap_filename.as_bytes())?;
217
218 debug!("Restored backup to {}", snap_path.display());
219 break;
220 }
221 }
222
223 let wal_dir = data_dir.join("raft").join("wal");
225 if wal_dir.exists() {
226 for entry in std::fs::read_dir(&wal_dir)? {
227 let entry = entry?;
228 let _ = std::fs::remove_file(entry.path());
229 }
230 }
231
232 let vote_path = data_dir.join("raft").join("vote.json");
234 let committed_path = data_dir.join("raft").join("committed.json");
235 let _ = std::fs::remove_file(&vote_path);
236 let _ = std::fs::remove_file(&committed_path);
237
238 Ok(metadata)
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::test_types::TestTypeConfig;
245
246 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
247 struct TestState {
248 items: Vec<String>,
249 }
250
251 #[derive(Debug, Clone, Serialize, Deserialize)]
252 struct TestMetadata {
253 item_count: usize,
254 }
255
256 impl crate::StateMachineState<TestTypeConfig> for TestState {
257 fn apply(
258 &mut self,
259 _cmd: crate::test_types::TestCommand,
260 ) -> crate::test_types::TestResponse {
261 crate::test_types::TestResponse::Ok
262 }
263
264 fn blank_response() -> crate::test_types::TestResponse {
265 crate::test_types::TestResponse::Ok
266 }
267 }
268
269 impl BackupMetadataSource for TestState {
270 type Metadata = TestMetadata;
271
272 fn backup_metadata(&self) -> TestMetadata {
273 TestMetadata {
274 item_count: self.items.len(),
275 }
276 }
277 }
278
279 fn test_state() -> Arc<RwLock<TestState>> {
280 Arc::new(RwLock::new(TestState {
281 items: vec!["one".into(), "two".into(), "three".into()],
282 }))
283 }
284
285 #[tokio::test]
286 async fn export_and_verify_roundtrip() {
287 let state = test_state();
288 let dir = tempfile::tempdir().unwrap();
289 let backup_path = dir.path().join("test-backup.tar.gz");
290
291 let export_meta = export_backup(&state, &backup_path).await.unwrap();
292 assert_eq!(export_meta.app.item_count, 3);
293
294 let verify_meta = verify_backup::<TestState, TestMetadata>(&backup_path).unwrap();
295 assert_eq!(verify_meta.app.item_count, 3);
296 }
297
298 #[tokio::test]
299 async fn verify_corrupt_backup_fails() {
300 let dir = tempfile::tempdir().unwrap();
301 let backup_path = dir.path().join("corrupt.tar.gz");
302 std::fs::write(&backup_path, b"not a valid tar.gz").unwrap();
303
304 let result = verify_backup::<TestState, TestMetadata>(&backup_path);
305 assert!(result.is_err());
306 }
307
308 #[tokio::test]
309 async fn verify_missing_snapshot_fails() {
310 let dir = tempfile::tempdir().unwrap();
311 let backup_path = dir.path().join("incomplete.tar.gz");
312
313 let file = std::fs::File::create(&backup_path).unwrap();
315 let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
316 let mut tar_builder = tar::Builder::new(enc);
317
318 let metadata = BackupMetadata {
319 timestamp: Utc::now(),
320 snapshot_term: 0,
321 snapshot_index: 0,
322 app: TestMetadata { item_count: 0 },
323 };
324 let metadata_json = serde_json::to_vec(&metadata).unwrap();
325
326 let mut header = tar::Header::new_gnu();
327 header.set_size(metadata_json.len() as u64);
328 header.set_mode(0o644);
329 header.set_cksum();
330 tar_builder
331 .append_data(
332 &mut header,
333 "backup/metadata.json",
334 metadata_json.as_slice(),
335 )
336 .unwrap();
337
338 tar_builder.into_inner().unwrap().finish().unwrap();
339
340 let result = verify_backup::<TestState, TestMetadata>(&backup_path);
341 assert!(result.is_err());
342 assert!(
343 result
344 .unwrap_err()
345 .to_string()
346 .contains("missing snapshot.json")
347 );
348 }
349
350 #[tokio::test]
351 async fn restore_writes_snapshot_files() {
352 let state = test_state();
353 let dir = tempfile::tempdir().unwrap();
354 let backup_path = dir.path().join("backup.tar.gz");
355 let data_dir = dir.path().join("restored");
356
357 export_backup(&state, &backup_path).await.unwrap();
358 let meta =
359 restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir)
360 .unwrap();
361
362 assert_eq!(meta.app.item_count, 3);
363
364 let snap_dir = data_dir.join("raft").join("snapshots");
366 assert!(snap_dir.exists());
367 assert!(snap_dir.join("current").exists());
368
369 let current = std::fs::read_to_string(snap_dir.join("current")).unwrap();
371 assert!(snap_dir.join(current.trim()).exists());
372 }
373
374 #[test]
375 fn verify_nonexistent_backup_fails() {
376 let result =
377 verify_backup::<TestState, TestMetadata>(Path::new("/nonexistent/backup.tar.gz"));
378 assert!(result.is_err());
379 }
380
381 #[tokio::test]
382 async fn restore_cleans_up_wal_and_vote() {
383 let state = test_state();
384 let dir = tempfile::tempdir().unwrap();
385 let backup_path = dir.path().join("backup.tar.gz");
386 let data_dir = dir.path().join("restored");
387
388 export_backup(&state, &backup_path).await.unwrap();
389
390 let wal_dir = data_dir.join("raft").join("wal");
392 std::fs::create_dir_all(&wal_dir).unwrap();
393 std::fs::write(wal_dir.join("1.json"), b"old entry").unwrap();
394 std::fs::write(wal_dir.join("2.json"), b"old entry").unwrap();
395
396 let raft_dir = data_dir.join("raft");
397 std::fs::write(raft_dir.join("vote.json"), b"old vote").unwrap();
398 std::fs::write(raft_dir.join("committed.json"), b"old committed").unwrap();
399
400 restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir).unwrap();
401
402 assert!(!wal_dir.join("1.json").exists());
404 assert!(!wal_dir.join("2.json").exists());
405 assert!(!raft_dir.join("vote.json").exists());
407 assert!(!raft_dir.join("committed.json").exists());
408 }
409
410 #[tokio::test]
411 async fn verify_ignores_unknown_entries() {
412 let state = test_state();
413 let dir = tempfile::tempdir().unwrap();
414 let backup_path = dir.path().join("extra-files.tar.gz");
415
416 let file = std::fs::File::create(&backup_path).unwrap();
418 let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
419 let mut tar_builder = tar::Builder::new(enc);
420
421 let state_guard = state.read().await;
422 let state_json = serde_json::to_vec_pretty(&*state_guard).unwrap();
423 let app_metadata = state_guard.backup_metadata();
424 drop(state_guard);
425
426 let metadata = BackupMetadata {
427 timestamp: Utc::now(),
428 snapshot_term: 0,
429 snapshot_index: 0,
430 app: app_metadata,
431 };
432 let metadata_json = serde_json::to_vec(&metadata).unwrap();
433
434 let mut header = tar::Header::new_gnu();
436 header.set_size(metadata_json.len() as u64);
437 header.set_mode(0o644);
438 header.set_cksum();
439 tar_builder
440 .append_data(
441 &mut header,
442 "backup/metadata.json",
443 metadata_json.as_slice(),
444 )
445 .unwrap();
446
447 let extra = b"some extra data";
449 let mut header = tar::Header::new_gnu();
450 header.set_size(extra.len() as u64);
451 header.set_mode(0o644);
452 header.set_cksum();
453 tar_builder
454 .append_data(&mut header, "backup/extra-info.txt", extra.as_slice())
455 .unwrap();
456
457 let mut header = tar::Header::new_gnu();
459 header.set_size(state_json.len() as u64);
460 header.set_mode(0o644);
461 header.set_cksum();
462 tar_builder
463 .append_data(&mut header, "backup/snapshot.json", state_json.as_slice())
464 .unwrap();
465
466 tar_builder.into_inner().unwrap().finish().unwrap();
467
468 let result = verify_backup::<TestState, TestMetadata>(&backup_path).unwrap();
470 assert_eq!(result.app.item_count, 3);
471 }
472
473 #[tokio::test]
474 async fn restore_snapshot_loadable_by_state_machine() {
475 use crate::state_machine::HpcStateMachine;
476
477 let state = test_state();
478 let dir = tempfile::tempdir().unwrap();
479 let backup_path = dir.path().join("backup.tar.gz");
480 let data_dir = dir.path().join("restored");
481
482 export_backup(&state, &backup_path).await.unwrap();
483 restore_backup::<TestTypeConfig, TestState, TestMetadata>(&backup_path, &data_dir).unwrap();
484
485 let snap_dir = data_dir.join("raft").join("snapshots");
487 let fresh_state = tokio::task::spawn_blocking(move || {
488 let fresh_state = Arc::new(tokio::sync::RwLock::new(TestState { items: vec![] }));
489 let _sm = HpcStateMachine::<TestTypeConfig, TestState>::with_snapshot_dir(
490 fresh_state.clone(),
491 snap_dir,
492 )
493 .unwrap();
494 fresh_state
495 })
496 .await
497 .unwrap();
498
499 let s = fresh_state.read().await;
500 assert_eq!(s.items.len(), 3);
501 assert_eq!(s.items[0], "one");
502 }
503
504 #[tokio::test]
505 async fn verify_missing_metadata_fails() {
506 let dir = tempfile::tempdir().unwrap();
507 let backup_path = dir.path().join("no-metadata.tar.gz");
508
509 let file = std::fs::File::create(&backup_path).unwrap();
511 let enc = flate2::write::GzEncoder::new(file, flate2::Compression::default());
512 let mut tar_builder = tar::Builder::new(enc);
513
514 let state = TestState::default();
515 let snapshot_json = serde_json::to_vec(&state).unwrap();
516
517 let mut header = tar::Header::new_gnu();
518 header.set_size(snapshot_json.len() as u64);
519 header.set_mode(0o644);
520 header.set_cksum();
521 tar_builder
522 .append_data(
523 &mut header,
524 "backup/snapshot.json",
525 snapshot_json.as_slice(),
526 )
527 .unwrap();
528
529 tar_builder.into_inner().unwrap().finish().unwrap();
530
531 let result = verify_backup::<TestState, TestMetadata>(&backup_path);
532 assert!(result.is_err());
533 assert!(
534 result
535 .unwrap_err()
536 .to_string()
537 .contains("missing metadata.json")
538 );
539 }
540}