1use std::path::Path;
4use std::sync::Arc;
5
6use super::{
7 load_archived_change_records, load_backup_head, load_snapshot_manifest,
8 load_wal_segment_manifest,
9};
10use crate::replication::logical::{ApplyMode, ApplyOutcome, LogicalChangeApplier};
11use crate::storage::backend::{BackendError, RemoteBackend};
12use crate::storage::RedDB;
13
14#[derive(Debug, Clone)]
16pub struct RestorePoint {
17 pub snapshot_id: u64,
18 pub snapshot_time: u64,
19 pub wal_segment_count: usize,
20 pub latest_recoverable_time: u64,
21}
22
23#[derive(Debug, Clone)]
25pub struct RecoveryResult {
26 pub snapshot_used: u64,
27 pub wal_segments_replayed: usize,
28 pub records_applied: u64,
29 pub recovered_to_lsn: u64,
30 pub recovered_to_time: u64,
31}
32
33#[derive(Debug, Clone)]
34pub struct RestorePlan {
35 pub timeline_id: String,
36 pub snapshot_key: String,
37 pub snapshot_id: u64,
38 pub snapshot_time: u64,
39 pub base_lsn: u64,
40 pub target_time: u64,
41 pub wal_segments: Vec<String>,
42 pub snapshot_sha256: Option<String>,
48}
49
50#[derive(Debug, Clone)]
51struct SnapshotDescriptor {
52 key: String,
53 snapshot_id: u64,
54 snapshot_time: u64,
55 timeline_id: String,
56 base_lsn: u64,
57 snapshot_sha256: Option<String>,
58}
59
60#[derive(Debug, Clone)]
61struct WalSegmentDescriptor {
62 key: String,
63 lsn_start: u64,
64 lsn_end: u64,
65}
66
67pub struct PointInTimeRecovery {
69 backend: Arc<dyn RemoteBackend>,
70 snapshot_prefix: String,
71 wal_prefix: String,
72}
73
74impl PointInTimeRecovery {
75 pub fn new(
76 backend: Arc<dyn RemoteBackend>,
77 snapshot_prefix: impl Into<String>,
78 wal_prefix: impl Into<String>,
79 ) -> Self {
80 Self {
81 backend,
82 snapshot_prefix: snapshot_prefix.into(),
83 wal_prefix: wal_prefix.into(),
84 }
85 }
86
87 pub fn plan_restore(&self, target_time: u64) -> Result<RestorePlan, BackendError> {
88 let snapshots = self.list_snapshots()?;
89 let selected = snapshots
90 .iter()
91 .filter(|snapshot| snapshot.snapshot_time <= target_time || target_time == 0)
92 .max_by_key(|snapshot| snapshot.snapshot_time)
93 .ok_or_else(|| {
94 BackendError::NotFound(format!(
95 "no snapshot available at or before target timestamp {target_time}"
96 ))
97 })?;
98
99 let wal_segments = self
100 .list_wal_segments()?
101 .into_iter()
102 .filter(|segment| segment.lsn_end > selected.base_lsn)
103 .map(|segment| segment.key)
104 .collect();
105
106 Ok(RestorePlan {
107 timeline_id: selected.timeline_id.clone(),
108 snapshot_key: selected.key.clone(),
109 snapshot_id: selected.snapshot_id,
110 snapshot_time: selected.snapshot_time,
111 base_lsn: selected.base_lsn,
112 target_time,
113 wal_segments,
114 snapshot_sha256: selected.snapshot_sha256.clone(),
115 })
116 }
117
118 pub fn restore_to(
119 &self,
120 target_time: u64,
121 dest_path: &Path,
122 ) -> Result<RecoveryResult, BackendError> {
123 let plan = self.plan_restore(target_time)?;
124 self.execute_restore(&plan, dest_path)
125 }
126
127 pub fn execute_restore(
128 &self,
129 plan: &RestorePlan,
130 dest_path: &Path,
131 ) -> Result<RecoveryResult, BackendError> {
132 if let Some(parent) = dest_path.parent() {
133 std::fs::create_dir_all(parent).map_err(|err| {
134 BackendError::Transport(format!(
135 "create restore destination directory failed: {err}"
136 ))
137 })?;
138 }
139
140 let downloaded = self.backend.download(&plan.snapshot_key, dest_path)?;
141 if !downloaded {
142 return Err(BackendError::NotFound(format!(
143 "snapshot '{}' disappeared during restore",
144 plan.snapshot_key
145 )));
146 }
147
148 match &plan.snapshot_sha256 {
160 Some(expected) => {
161 let computed =
162 crate::storage::wal::SnapshotManifest::compute_snapshot_sha256(dest_path)?;
163 if !computed.eq_ignore_ascii_case(expected) {
164 return Err(BackendError::Internal(format!(
165 "snapshot integrity check failed for '{}': manifest sha256 {} != computed sha256 {}; \
166 downloaded file kept at {} for forensics",
167 plan.snapshot_key,
168 expected,
169 computed,
170 dest_path.display(),
171 )));
172 }
173 }
174 None => {
175 tracing::warn!(
176 target: "reddb::backup::restore",
177 snapshot_key = %plan.snapshot_key,
178 "manifest predates snapshot_sha256 field; restore proceeding without integrity check"
179 );
180 }
181 }
182
183 let db = RedDB::open(dest_path).map_err(|err| {
184 BackendError::Internal(format!("open restore database failed: {err}"))
185 })?;
186
187 let mut wal_segments_replayed = 0usize;
188 let mut records_applied = 0u64;
189 let mut recovered_to_lsn = plan.base_lsn;
190 let mut recovered_to_time = plan.snapshot_time;
191
192 let applier = LogicalChangeApplier::new(plan.base_lsn);
197
198 let mut prev_segment_sha: Option<String> = None;
203
204 for (segment_idx, segment_key) in plan.wal_segments.iter().enumerate() {
205 let manifest = super::load_wal_segment_manifest(self.backend.as_ref(), segment_key)?;
212 let (records, computed_sha) =
213 super::archiver::load_archived_change_records_with_sha256(
214 self.backend.as_ref(),
215 segment_key,
216 )?;
217 match manifest.as_ref().and_then(|m| m.sha256.as_deref()) {
218 Some(expected) => match computed_sha.as_deref() {
219 Some(actual) if actual.eq_ignore_ascii_case(expected) => {}
220 Some(actual) => {
221 return Err(BackendError::Internal(format!(
222 "wal segment integrity check failed for '{segment_key}': \
223 manifest sha256 {expected} != computed sha256 {actual}",
224 )));
225 }
226 None => {
227 return Err(BackendError::Internal(format!(
228 "wal segment integrity check failed for '{segment_key}': \
229 expected sha256 {expected} but segment was empty / unreadable",
230 )));
231 }
232 },
233 None => {
234 tracing::warn!(
235 target: "reddb::backup::restore",
236 segment_key = %segment_key,
237 "wal segment manifest absent or sha256-less; restore proceeding without integrity check"
238 );
239 }
240 }
241
242 if let Some(m) = manifest.as_ref() {
253 match (&m.prev_hash, &prev_segment_sha) {
254 (Some(declared), Some(actual)) => {
255 if !declared.eq_ignore_ascii_case(actual) {
256 return Err(BackendError::Internal(format!(
257 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
258 declared prev_hash {declared} != prior segment sha256 {actual}; \
259 a segment was removed, reordered, or replaced",
260 )));
261 }
262 }
263 (Some(declared), None) => {
264 return Err(BackendError::Internal(format!(
265 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
266 segment declares prev_hash {declared} but no prior segment was loaded; \
267 the first segment of the chain is missing",
268 )));
269 }
270 (None, Some(actual)) => {
271 return Err(BackendError::Internal(format!(
272 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
273 segment claims to be the first in its timeline but a prior segment \
274 (sha256 {actual}) was already replayed; reorder or merge of two timelines",
275 )));
276 }
277 (None, None) => {
278 }
280 }
281 if let Some(sha) = m.sha256.clone() {
286 prev_segment_sha = Some(sha);
287 }
288 } else {
289 prev_segment_sha = None;
294 }
295
296 let mut segment_applied = false;
297 for record in records {
298 if record.lsn <= plan.base_lsn {
299 continue;
300 }
301 if plan.target_time != 0 && record.timestamp > plan.target_time {
302 continue;
303 }
304 match applier.apply(&db, &record, ApplyMode::Restore) {
305 Ok(ApplyOutcome::Applied) => {
306 recovered_to_lsn = recovered_to_lsn.max(record.lsn);
307 recovered_to_time = recovered_to_time.max(record.timestamp);
308 records_applied += 1;
309 segment_applied = true;
310 }
311 Ok(ApplyOutcome::Idempotent) | Ok(ApplyOutcome::Skipped) => {}
312 Err(err) => {
313 return Err(BackendError::Internal(format!(
314 "restore apply failed at lsn {} in segment '{}': {}",
315 record.lsn, segment_key, err
316 )));
317 }
318 }
319 }
320 if segment_applied {
321 wal_segments_replayed += 1;
322 }
323 }
324
325 db.flush().map_err(|err| {
326 BackendError::Internal(format!("flush restored database failed: {err}"))
327 })?;
328
329 Ok(RecoveryResult {
330 snapshot_used: plan.snapshot_id,
331 wal_segments_replayed,
332 records_applied,
333 recovered_to_lsn,
334 recovered_to_time,
335 })
336 }
337
338 pub fn list_restore_points(&self) -> Result<Vec<RestorePoint>, BackendError> {
339 let snapshots = self.list_snapshots()?;
340 let wal_segments = self.list_wal_segments()?;
341 let mut out = Vec::new();
342
343 for snapshot in snapshots {
344 let wal_segment_count = wal_segments
345 .iter()
346 .filter(|segment| segment.lsn_end > snapshot.base_lsn)
347 .count();
348 out.push(RestorePoint {
349 snapshot_id: snapshot.snapshot_id,
350 snapshot_time: snapshot.snapshot_time,
351 wal_segment_count,
352 latest_recoverable_time: snapshot.snapshot_time,
353 });
354 }
355
356 out.sort_by_key(|point| point.snapshot_time);
357 Ok(out)
358 }
359
360 fn list_snapshots(&self) -> Result<Vec<SnapshotDescriptor>, BackendError> {
361 let snapshots = self.backend.list(&self.snapshot_prefix)?;
362 let mut out = Vec::new();
363 for key in snapshots {
364 let Some(file_name) = std::path::Path::new(&key)
365 .file_name()
366 .and_then(|s| s.to_str())
367 else {
368 continue;
369 };
370 let Some(base) = file_name.strip_suffix(".snapshot") else {
371 continue;
372 };
373 let Some((snapshot_id, snapshot_time)) = base.split_once('-') else {
374 continue;
375 };
376 let (Ok(snapshot_id), Ok(snapshot_time)) =
377 (snapshot_id.parse::<u64>(), snapshot_time.parse::<u64>())
378 else {
379 continue;
380 };
381 let manifest = load_snapshot_manifest(self.backend.as_ref(), &key)?;
382 let (snapshot_id, snapshot_time, timeline_id, base_lsn, snapshot_sha256) =
383 if let Some(manifest) = manifest {
384 (
385 manifest.snapshot_id,
386 manifest.snapshot_time,
387 manifest.timeline_id,
388 manifest.base_lsn,
389 manifest.snapshot_sha256,
390 )
391 } else {
392 let (timeline_id, base_lsn) = self
393 .load_current_head()
394 .filter(|head| head.snapshot_id == snapshot_id)
395 .map(|head| (head.timeline_id, head.current_lsn))
396 .unwrap_or_else(|| ("main".to_string(), 0));
397 (snapshot_id, snapshot_time, timeline_id, base_lsn, None)
398 };
399
400 out.push(SnapshotDescriptor {
401 key,
402 snapshot_id,
403 snapshot_time,
404 timeline_id,
405 base_lsn,
406 snapshot_sha256,
407 });
408 }
409 out.sort_by_key(|snapshot| snapshot.snapshot_time);
410 Ok(out)
411 }
412
413 fn list_wal_segments(&self) -> Result<Vec<WalSegmentDescriptor>, BackendError> {
414 let keys = self.backend.list(&self.wal_prefix)?;
415 let mut out = Vec::new();
416 for key in keys {
417 let Some(file_name) = std::path::Path::new(&key)
418 .file_name()
419 .and_then(|s| s.to_str())
420 else {
421 continue;
422 };
423 let Some((start, end)) = file_name
424 .strip_suffix(".wal")
425 .and_then(|base| base.split_once('-'))
426 else {
427 continue;
428 };
429 let (Ok(lsn_start), Ok(lsn_end)) = (start.parse::<u64>(), end.parse::<u64>()) else {
430 continue;
431 };
432 out.push(WalSegmentDescriptor {
433 key,
434 lsn_start,
435 lsn_end,
436 });
437 }
438 out.sort_by_key(|segment| segment.lsn_start);
439 Ok(out)
440 }
441
442 fn load_current_head(&self) -> Option<super::BackupHead> {
443 let snapshot_root = self.snapshot_prefix.trim_end_matches('/');
444 let parent = Path::new(snapshot_root).parent()?;
445 let parent = parent.to_string_lossy().trim_end_matches('/').to_string();
446 let head_key = if parent.is_empty() {
447 "manifests/head.json".to_string()
448 } else {
449 format!("{parent}/manifests/head.json")
450 };
451 load_backup_head(self.backend.as_ref(), &head_key)
452 .ok()
453 .flatten()
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::storage::backend::LocalBackend;
461 use crate::storage::wal::{publish_snapshot_manifest, SnapshotManifest};
462
463 #[test]
464 fn restore_to_downloads_latest_snapshot_before_target() {
465 let temp_dir =
466 std::env::temp_dir().join(format!("reddb_pitr_restore_{}_{}", std::process::id(), 1));
467 let snapshot_dir = temp_dir.join("snapshots");
468 let restore_path = temp_dir.join("restore").join("data.rdb");
469 let _ = std::fs::remove_dir_all(&temp_dir);
470 std::fs::create_dir_all(&snapshot_dir).unwrap();
471
472 let snapshot1 = snapshot_dir.join("1-100.snapshot");
473 let snapshot2 = snapshot_dir.join("2-200.snapshot");
474 RedDB::open(&snapshot1).unwrap().flush().unwrap();
475 RedDB::open(&snapshot2).unwrap().flush().unwrap();
476 publish_snapshot_manifest(
477 &LocalBackend,
478 &SnapshotManifest {
479 timeline_id: "main".to_string(),
480 snapshot_key: snapshot1.to_string_lossy().to_string(),
481 snapshot_id: 1,
482 snapshot_time: 100,
483 base_lsn: 0,
484 schema_version: crate::api::REDDB_FORMAT_VERSION,
485 format_version: crate::api::REDDB_FORMAT_VERSION,
486 snapshot_sha256: None,
487 },
488 )
489 .unwrap();
490 publish_snapshot_manifest(
491 &LocalBackend,
492 &SnapshotManifest {
493 timeline_id: "main".to_string(),
494 snapshot_key: snapshot2.to_string_lossy().to_string(),
495 snapshot_id: 2,
496 snapshot_time: 200,
497 base_lsn: 0,
498 schema_version: crate::api::REDDB_FORMAT_VERSION,
499 format_version: crate::api::REDDB_FORMAT_VERSION,
500 snapshot_sha256: None,
501 },
502 )
503 .unwrap();
504
505 let recovery = PointInTimeRecovery::new(
506 Arc::new(LocalBackend),
507 snapshot_dir.to_string_lossy().to_string(),
508 temp_dir.join("wal").to_string_lossy().to_string(),
509 );
510
511 let result = recovery.restore_to(150, &restore_path).unwrap();
512 assert_eq!(result.snapshot_used, 1);
513 assert_eq!(result.recovered_to_time, 100);
514 assert!(restore_path.exists());
515
516 let _ = std::fs::remove_dir_all(&temp_dir);
517 }
518
519 fn run_chain_restore(
523 tag: &str,
524 mutate: impl FnOnce(&LocalBackend, &[crate::storage::wal::WalSegmentMeta]),
525 ) -> Result<RecoveryResult, BackendError> {
526 use crate::replication::cdc::ChangeRecord;
527 use crate::storage::schema::Value;
528 use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
529 let temp_dir = std::env::temp_dir().join(format!(
530 "reddb_chain_{}_{}_{}",
531 tag,
532 std::process::id(),
533 std::time::SystemTime::now()
534 .duration_since(std::time::UNIX_EPOCH)
535 .unwrap()
536 .as_nanos()
537 ));
538 let _ = std::fs::remove_dir_all(&temp_dir);
539 let snapshot_dir = temp_dir.join("snapshots");
540 let wal_dir = temp_dir.join("wal");
541 let restore_path = temp_dir.join("restore").join("data.rdb");
542 std::fs::create_dir_all(&snapshot_dir).unwrap();
543 std::fs::create_dir_all(&wal_dir).unwrap();
544
545 let snapshot_path = snapshot_dir.join("1-100.snapshot");
546 RedDB::open(&snapshot_path).unwrap().flush().unwrap();
547 publish_snapshot_manifest(
548 &LocalBackend,
549 &SnapshotManifest {
550 timeline_id: "main".to_string(),
551 snapshot_key: snapshot_path.to_string_lossy().to_string(),
552 snapshot_id: 1,
553 snapshot_time: 100,
554 base_lsn: 0,
555 schema_version: crate::api::REDDB_FORMAT_VERSION,
556 format_version: crate::api::REDDB_FORMAT_VERSION,
557 snapshot_sha256: None,
558 },
559 )
560 .unwrap();
561
562 let wal_prefix = format!("{}/", wal_dir.to_string_lossy());
563 let mk = |lsn: u64| {
564 let timestamp = 100 + lsn;
565 let mut entity = UnifiedEntity::new(
566 EntityId::new(lsn),
567 EntityKind::TableRow {
568 table: Arc::from("users"),
569 row_id: lsn,
570 },
571 EntityData::Row(RowData::with_names(
572 vec![
573 Value::UnsignedInteger(lsn),
574 Value::Text(format!("payload-{lsn}").into()),
575 ],
576 vec!["id".to_string(), "payload".to_string()],
577 )),
578 );
579 entity.created_at = timestamp;
580 entity.updated_at = timestamp;
581 entity.sequence_id = lsn;
582 ChangeRecord::from_entity(
583 lsn,
584 timestamp,
585 crate::replication::cdc::ChangeOperation::Insert,
586 "users",
587 "row",
588 &entity,
589 crate::api::REDDB_FORMAT_VERSION,
590 None,
591 )
592 };
593
594 let mut metas = Vec::new();
595 let mut prev: Option<String> = None;
596 for lsn in [1u64, 2, 3] {
597 let r = mk(lsn);
598 let m = crate::storage::wal::archive_change_records(
599 &LocalBackend,
600 &wal_prefix,
601 &[(r.lsn, r.encode())],
602 prev.clone(),
603 )
604 .unwrap()
605 .expect("archived");
606 prev = m.sha256.clone();
607 metas.push(m);
608 }
609
610 mutate(&LocalBackend, &metas);
611
612 let recovery = PointInTimeRecovery::new(
613 Arc::new(LocalBackend),
614 snapshot_dir.to_string_lossy().to_string(),
615 wal_prefix,
616 );
617 let result = recovery.restore_to(0, &restore_path);
618 let _ = std::fs::remove_dir_all(&temp_dir);
619 result
620 }
621
622 #[test]
623 fn restore_succeeds_with_intact_chain() {
624 let result = run_chain_restore("intact", |_, _| {});
625 let r = result.expect("intact chain restore must succeed");
626 assert_eq!(r.wal_segments_replayed, 3);
627 }
628
629 #[test]
630 fn restore_fails_closed_on_chain_break() {
631 let result = run_chain_restore("chainbreak", |backend, metas| {
634 let sidecar_key = crate::storage::wal::wal_segment_manifest_key(&metas[1].key);
635 let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
636 .unwrap()
637 .unwrap();
638 bad.prev_hash = Some("00".repeat(32));
639 crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
640 let _ = sidecar_key; });
642 let err = result.expect_err("chain break must fail closed");
643 let msg = err.to_string();
644 assert!(
645 msg.contains("chain"),
646 "error must mention chain; got: {msg}"
647 );
648 }
649
650 #[test]
651 fn restore_fails_closed_on_sha256_corruption() {
652 let result = run_chain_restore("shacorrupt", |backend, metas| {
655 let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
656 .unwrap()
657 .unwrap();
658 bad.sha256 = Some("ff".repeat(32));
659 crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
660 });
661 let err = result.expect_err("sha mismatch must fail closed");
662 let msg = err.to_string();
663 assert!(
664 msg.contains("integrity") || msg.contains("sha256"),
665 "error must mention integrity/sha256; got: {msg}"
666 );
667 }
668}