1use std::path::Path;
4use std::sync::Arc;
5
6use super::{
7 load_archived_change_records, load_backup_head, load_snapshot_manifest,
8 load_wal_segment_manifest,
9};
10use crate::replication::logical::{ApplyMode, ApplyOutcome, LogicalChangeApplier};
11use crate::storage::backend::{BackendError, RemoteBackend};
12use crate::storage::RedDB;
13
14#[derive(Debug, Clone)]
16pub struct RestorePoint {
17 pub snapshot_id: u64,
18 pub snapshot_time: u64,
19 pub wal_segment_count: usize,
20 pub latest_recoverable_time: u64,
21}
22
23#[derive(Debug, Clone)]
25pub struct RecoveryResult {
26 pub snapshot_used: u64,
27 pub wal_segments_replayed: usize,
28 pub records_applied: u64,
29 pub recovered_to_lsn: u64,
30 pub recovered_to_time: u64,
31}
32
33#[derive(Debug, Clone)]
34pub struct RestorePlan {
35 pub timeline_id: String,
36 pub snapshot_key: String,
37 pub snapshot_id: u64,
38 pub snapshot_time: u64,
39 pub base_lsn: u64,
40 pub target_time: u64,
41 pub wal_segments: Vec<String>,
42 pub snapshot_sha256: Option<String>,
48}
49
50#[derive(Debug, Clone)]
51struct SnapshotDescriptor {
52 key: String,
53 snapshot_id: u64,
54 snapshot_time: u64,
55 timeline_id: String,
56 base_lsn: u64,
57 snapshot_sha256: Option<String>,
58}
59
60#[derive(Debug, Clone)]
61struct WalSegmentDescriptor {
62 key: String,
63 lsn_start: u64,
64 lsn_end: u64,
65}
66
67pub struct PointInTimeRecovery {
69 backend: Arc<dyn RemoteBackend>,
70 snapshot_prefix: String,
71 wal_prefix: String,
72}
73
74impl PointInTimeRecovery {
75 pub fn new(
76 backend: Arc<dyn RemoteBackend>,
77 snapshot_prefix: impl Into<String>,
78 wal_prefix: impl Into<String>,
79 ) -> Self {
80 Self {
81 backend,
82 snapshot_prefix: snapshot_prefix.into(),
83 wal_prefix: wal_prefix.into(),
84 }
85 }
86
87 pub fn plan_restore(&self, target_time: u64) -> Result<RestorePlan, BackendError> {
88 let snapshots = self.list_snapshots()?;
89 let selected = snapshots
90 .iter()
91 .filter(|snapshot| snapshot.snapshot_time <= target_time || target_time == 0)
92 .max_by_key(|snapshot| snapshot.snapshot_time)
93 .ok_or_else(|| {
94 BackendError::NotFound(format!(
95 "no snapshot available at or before target timestamp {target_time}"
96 ))
97 })?;
98
99 let wal_segments = self
100 .list_wal_segments()?
101 .into_iter()
102 .filter(|segment| segment.lsn_end > selected.base_lsn)
103 .map(|segment| segment.key)
104 .collect();
105
106 Ok(RestorePlan {
107 timeline_id: selected.timeline_id.clone(),
108 snapshot_key: selected.key.clone(),
109 snapshot_id: selected.snapshot_id,
110 snapshot_time: selected.snapshot_time,
111 base_lsn: selected.base_lsn,
112 target_time,
113 wal_segments,
114 snapshot_sha256: selected.snapshot_sha256.clone(),
115 })
116 }
117
118 pub fn restore_to(
119 &self,
120 target_time: u64,
121 dest_path: &Path,
122 ) -> Result<RecoveryResult, BackendError> {
123 let plan = self.plan_restore(target_time)?;
124 self.execute_restore(&plan, dest_path)
125 }
126
127 pub fn execute_restore(
128 &self,
129 plan: &RestorePlan,
130 dest_path: &Path,
131 ) -> Result<RecoveryResult, BackendError> {
132 if let Some(parent) = dest_path.parent() {
133 std::fs::create_dir_all(parent).map_err(|err| {
134 BackendError::Transport(format!(
135 "create restore destination directory failed: {err}"
136 ))
137 })?;
138 }
139
140 let downloaded = self.backend.download(&plan.snapshot_key, dest_path)?;
141 if !downloaded {
142 return Err(BackendError::NotFound(format!(
143 "snapshot '{}' disappeared during restore",
144 plan.snapshot_key
145 )));
146 }
147
148 match &plan.snapshot_sha256 {
160 Some(expected) => {
161 let computed = reddb_file::SnapshotManifest::compute_snapshot_sha256(dest_path)
162 .map_err(|err| {
163 BackendError::Internal(format!(
164 "snapshot integrity hash failed for '{}': {err}",
165 plan.snapshot_key
166 ))
167 })?;
168 if !computed.eq_ignore_ascii_case(expected) {
169 return Err(BackendError::Internal(format!(
170 "snapshot integrity check failed for '{}': manifest sha256 {} != computed sha256 {}; \
171 downloaded file kept at {} for forensics",
172 plan.snapshot_key,
173 expected,
174 computed,
175 dest_path.display(),
176 )));
177 }
178 }
179 None => {
180 tracing::warn!(
181 target: "reddb::backup::restore",
182 snapshot_key = %plan.snapshot_key,
183 "manifest predates snapshot_sha256 field; restore proceeding without integrity check"
184 );
185 }
186 }
187
188 let db = RedDB::open(dest_path).map_err(|err| {
189 BackendError::Internal(format!("open restore database failed: {err}"))
190 })?;
191
192 let mut wal_segments_replayed = 0usize;
193 let mut records_applied = 0u64;
194 let mut recovered_to_lsn = plan.base_lsn;
195 let mut recovered_to_time = plan.snapshot_time;
196
197 let applier = LogicalChangeApplier::new(plan.base_lsn);
202
203 let mut prev_segment_sha: Option<String> = None;
208
209 for (segment_idx, segment_key) in plan.wal_segments.iter().enumerate() {
210 let manifest = super::load_wal_segment_manifest(self.backend.as_ref(), segment_key)?;
217 let (records, computed_sha) =
218 super::archiver::load_archived_change_records_with_sha256(
219 self.backend.as_ref(),
220 segment_key,
221 )?;
222 match manifest.as_ref().and_then(|m| m.sha256.as_deref()) {
223 Some(expected) => match computed_sha.as_deref() {
224 Some(actual) if actual.eq_ignore_ascii_case(expected) => {}
225 Some(actual) => {
226 return Err(BackendError::Internal(format!(
227 "wal segment integrity check failed for '{segment_key}': \
228 manifest sha256 {expected} != computed sha256 {actual}",
229 )));
230 }
231 None => {
232 return Err(BackendError::Internal(format!(
233 "wal segment integrity check failed for '{segment_key}': \
234 expected sha256 {expected} but segment was empty / unreadable",
235 )));
236 }
237 },
238 None => {
239 tracing::warn!(
240 target: "reddb::backup::restore",
241 segment_key = %segment_key,
242 "wal segment manifest absent or sha256-less; restore proceeding without integrity check"
243 );
244 }
245 }
246
247 if let Some(m) = manifest.as_ref() {
258 match (&m.prev_hash, &prev_segment_sha) {
259 (Some(declared), Some(actual)) => {
260 if !declared.eq_ignore_ascii_case(actual) {
261 return Err(BackendError::Internal(format!(
262 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
263 declared prev_hash {declared} != prior segment sha256 {actual}; \
264 a segment was removed, reordered, or replaced",
265 )));
266 }
267 }
268 (Some(declared), None) => {
269 return Err(BackendError::Internal(format!(
270 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
271 segment declares prev_hash {declared} but no prior segment was loaded; \
272 the first segment of the chain is missing",
273 )));
274 }
275 (None, Some(actual)) => {
276 return Err(BackendError::Internal(format!(
277 "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
278 segment claims to be the first in its timeline but a prior segment \
279 (sha256 {actual}) was already replayed; reorder or merge of two timelines",
280 )));
281 }
282 (None, None) => {
283 }
285 }
286 if let Some(sha) = m.sha256.clone() {
291 prev_segment_sha = Some(sha);
292 }
293 } else {
294 prev_segment_sha = None;
299 }
300
301 let mut segment_applied = false;
302 for record in records {
303 if record.lsn <= plan.base_lsn {
304 continue;
305 }
306 if plan.target_time != 0 && record.timestamp > plan.target_time {
307 continue;
308 }
309 match applier.apply(&db, &record, ApplyMode::Restore) {
310 Ok(ApplyOutcome::Applied) => {
311 recovered_to_lsn = recovered_to_lsn.max(record.lsn);
312 recovered_to_time = recovered_to_time.max(record.timestamp);
313 records_applied += 1;
314 segment_applied = true;
315 }
316 Ok(ApplyOutcome::Idempotent) | Ok(ApplyOutcome::Skipped) => {}
317 Err(err) => {
318 return Err(BackendError::Internal(format!(
319 "restore apply failed at lsn {} in segment '{}': {}",
320 record.lsn, segment_key, err
321 )));
322 }
323 }
324 }
325 if segment_applied {
326 wal_segments_replayed += 1;
327 }
328 }
329
330 db.flush().map_err(|err| {
331 BackendError::Internal(format!("flush restored database failed: {err}"))
332 })?;
333
334 Ok(RecoveryResult {
335 snapshot_used: plan.snapshot_id,
336 wal_segments_replayed,
337 records_applied,
338 recovered_to_lsn,
339 recovered_to_time,
340 })
341 }
342
343 pub fn list_restore_points(&self) -> Result<Vec<RestorePoint>, BackendError> {
344 let snapshots = self.list_snapshots()?;
345 let wal_segments = self.list_wal_segments()?;
346 let mut out = Vec::new();
347
348 for snapshot in snapshots {
349 let wal_segment_count = wal_segments
350 .iter()
351 .filter(|segment| segment.lsn_end > snapshot.base_lsn)
352 .count();
353 out.push(RestorePoint {
354 snapshot_id: snapshot.snapshot_id,
355 snapshot_time: snapshot.snapshot_time,
356 wal_segment_count,
357 latest_recoverable_time: snapshot.snapshot_time,
358 });
359 }
360
361 out.sort_by_key(|point| point.snapshot_time);
362 Ok(out)
363 }
364
365 fn list_snapshots(&self) -> Result<Vec<SnapshotDescriptor>, BackendError> {
366 let snapshots = self.backend.list(&self.snapshot_prefix)?;
367 let mut out = Vec::new();
368 for key in snapshots {
369 let Some((snapshot_id, snapshot_time)) = reddb_file::parse_archived_snapshot_key(&key)
370 else {
371 continue;
372 };
373 let manifest = load_snapshot_manifest(self.backend.as_ref(), &key)?;
374 let (snapshot_id, snapshot_time, timeline_id, base_lsn, snapshot_sha256) =
375 if let Some(manifest) = manifest {
376 (
377 manifest.snapshot_id,
378 manifest.snapshot_time,
379 manifest.timeline_id,
380 manifest.base_lsn,
381 manifest.snapshot_sha256,
382 )
383 } else {
384 let (timeline_id, base_lsn) = self
385 .load_current_head()
386 .filter(|head| head.snapshot_id == snapshot_id)
387 .map(|head| (head.timeline_id, head.current_lsn))
388 .unwrap_or_else(|| ("main".to_string(), 0));
389 (snapshot_id, snapshot_time, timeline_id, base_lsn, None)
390 };
391
392 out.push(SnapshotDescriptor {
393 key,
394 snapshot_id,
395 snapshot_time,
396 timeline_id,
397 base_lsn,
398 snapshot_sha256,
399 });
400 }
401 out.sort_by_key(|snapshot| snapshot.snapshot_time);
402 Ok(out)
403 }
404
405 fn list_wal_segments(&self) -> Result<Vec<WalSegmentDescriptor>, BackendError> {
406 let keys = self.backend.list(&self.wal_prefix)?;
407 let mut out = Vec::new();
408 for key in keys {
409 let Some((lsn_start, lsn_end)) = reddb_file::parse_archived_wal_segment_key(&key)
410 else {
411 continue;
412 };
413 out.push(WalSegmentDescriptor {
414 key,
415 lsn_start,
416 lsn_end,
417 });
418 }
419 out.sort_by_key(|segment| segment.lsn_start);
420 Ok(out)
421 }
422
423 fn load_current_head(&self) -> Option<reddb_file::BackupHead> {
424 let root = reddb_file::backup_root_from_snapshot_prefix(&self.snapshot_prefix);
425 let head_key = reddb_file::backup_head_key(&root);
426 load_backup_head(self.backend.as_ref(), &head_key)
427 .ok()
428 .flatten()
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::storage::backend::LocalBackend;
436 use crate::storage::wal::publish_snapshot_manifest;
437
438 #[test]
439 fn restore_to_downloads_latest_snapshot_before_target() {
440 let temp_dir =
441 std::env::temp_dir().join(format!("reddb_pitr_restore_{}_{}", std::process::id(), 1));
442 let snapshot_dir = reddb_file::backup_snapshot_dir(&temp_dir);
443 let restore_path = temp_dir.join("restore").join("data.rdb");
444 let _ = std::fs::remove_dir_all(&temp_dir);
445 std::fs::create_dir_all(&snapshot_dir).unwrap();
446
447 let snapshot1 = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
448 &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
449 1,
450 100,
451 ));
452 let snapshot2 = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
453 &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
454 2,
455 200,
456 ));
457 RedDB::open(&snapshot1).unwrap().flush().unwrap();
458 RedDB::open(&snapshot2).unwrap().flush().unwrap();
459 publish_snapshot_manifest(
460 &LocalBackend,
461 &reddb_file::SnapshotManifest {
462 timeline_id: "main".to_string(),
463 snapshot_key: snapshot1.to_string_lossy().to_string(),
464 snapshot_id: 1,
465 snapshot_time: 100,
466 base_lsn: 0,
467 schema_version: crate::api::REDDB_FORMAT_VERSION,
468 format_version: crate::api::REDDB_FORMAT_VERSION,
469 snapshot_sha256: None,
470 },
471 )
472 .unwrap();
473 publish_snapshot_manifest(
474 &LocalBackend,
475 &reddb_file::SnapshotManifest {
476 timeline_id: "main".to_string(),
477 snapshot_key: snapshot2.to_string_lossy().to_string(),
478 snapshot_id: 2,
479 snapshot_time: 200,
480 base_lsn: 0,
481 schema_version: crate::api::REDDB_FORMAT_VERSION,
482 format_version: crate::api::REDDB_FORMAT_VERSION,
483 snapshot_sha256: None,
484 },
485 )
486 .unwrap();
487
488 let recovery = PointInTimeRecovery::new(
489 Arc::new(LocalBackend),
490 snapshot_dir.to_string_lossy().to_string(),
491 reddb_file::backup_wal_dir(&temp_dir)
492 .to_string_lossy()
493 .to_string(),
494 );
495
496 let result = recovery.restore_to(150, &restore_path).unwrap();
497 assert_eq!(result.snapshot_used, 1);
498 assert_eq!(result.recovered_to_time, 100);
499 assert!(restore_path.exists());
500
501 let _ = std::fs::remove_dir_all(&temp_dir);
502 }
503
504 fn run_chain_restore(
508 tag: &str,
509 mutate: impl FnOnce(&LocalBackend, &[reddb_file::WalSegmentMeta]),
510 ) -> Result<RecoveryResult, BackendError> {
511 use crate::replication::cdc::{change_record_from_entity, ChangeRecord};
512 use crate::storage::schema::Value;
513 use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
514 let temp_dir = std::env::temp_dir().join(format!(
515 "reddb_chain_{}_{}_{}",
516 tag,
517 std::process::id(),
518 std::time::SystemTime::now()
519 .duration_since(std::time::UNIX_EPOCH)
520 .unwrap()
521 .as_nanos()
522 ));
523 let _ = std::fs::remove_dir_all(&temp_dir);
524 let snapshot_dir = reddb_file::backup_snapshot_dir(&temp_dir);
525 let wal_dir = reddb_file::backup_wal_dir(&temp_dir);
526 let restore_path = temp_dir.join("restore").join("data.rdb");
527 std::fs::create_dir_all(&snapshot_dir).unwrap();
528 std::fs::create_dir_all(&wal_dir).unwrap();
529
530 let snapshot_path = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
531 &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
532 1,
533 100,
534 ));
535 RedDB::open(&snapshot_path).unwrap().flush().unwrap();
536 publish_snapshot_manifest(
537 &LocalBackend,
538 &reddb_file::SnapshotManifest {
539 timeline_id: "main".to_string(),
540 snapshot_key: snapshot_path.to_string_lossy().to_string(),
541 snapshot_id: 1,
542 snapshot_time: 100,
543 base_lsn: 0,
544 schema_version: crate::api::REDDB_FORMAT_VERSION,
545 format_version: crate::api::REDDB_FORMAT_VERSION,
546 snapshot_sha256: None,
547 },
548 )
549 .unwrap();
550
551 let wal_prefix = reddb_file::backup_wal_dir_prefix(&temp_dir);
552 let mk = |lsn: u64| {
553 let timestamp = 100 + lsn;
554 let mut entity = UnifiedEntity::new(
555 EntityId::new(lsn),
556 EntityKind::TableRow {
557 table: Arc::from("users"),
558 row_id: lsn,
559 },
560 EntityData::Row(RowData::with_names(
561 vec![
562 Value::UnsignedInteger(lsn),
563 Value::Text(format!("payload-{lsn}").into()),
564 ],
565 vec!["id".to_string(), "payload".to_string()],
566 )),
567 );
568 entity.created_at = timestamp;
569 entity.updated_at = timestamp;
570 entity.sequence_id = lsn;
571 change_record_from_entity(
572 lsn,
573 timestamp,
574 crate::replication::cdc::ChangeOperation::Insert,
575 "users",
576 "row",
577 &entity,
578 crate::api::REDDB_FORMAT_VERSION,
579 None,
580 )
581 };
582
583 let mut metas = Vec::new();
584 let mut prev: Option<String> = None;
585 for lsn in [1u64, 2, 3] {
586 let r = mk(lsn);
587 let m = crate::storage::wal::archive_change_records(
588 &LocalBackend,
589 &wal_prefix,
590 &[(r.lsn, r.encode())],
591 prev.clone(),
592 )
593 .unwrap()
594 .expect("archived");
595 prev = m.sha256.clone();
596 metas.push(m);
597 }
598
599 mutate(&LocalBackend, &metas);
600
601 let recovery = PointInTimeRecovery::new(
602 Arc::new(LocalBackend),
603 snapshot_dir.to_string_lossy().to_string(),
604 wal_prefix,
605 );
606 let result = recovery.restore_to(0, &restore_path);
607 let _ = std::fs::remove_dir_all(&temp_dir);
608 result
609 }
610
611 #[test]
612 fn restore_succeeds_with_intact_chain() {
613 let result = run_chain_restore("intact", |_, _| {});
614 let r = result.expect("intact chain restore must succeed");
615 assert_eq!(r.wal_segments_replayed, 3);
616 }
617
618 #[test]
619 fn restore_fails_closed_on_chain_break() {
620 let result = run_chain_restore("chainbreak", |backend, metas| {
623 let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
624 .unwrap()
625 .unwrap();
626 bad.prev_hash = Some("00".repeat(32));
627 crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
628 });
629 let err = result.expect_err("chain break must fail closed");
630 let msg = err.to_string();
631 assert!(
632 msg.contains("chain"),
633 "error must mention chain; got: {msg}"
634 );
635 }
636
637 #[test]
638 fn restore_fails_closed_on_sha256_corruption() {
639 let result = run_chain_restore("shacorrupt", |backend, metas| {
642 let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
643 .unwrap()
644 .unwrap();
645 bad.sha256 = Some("ff".repeat(32));
646 crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
647 });
648 let err = result.expect_err("sha mismatch must fail closed");
649 let msg = err.to_string();
650 assert!(
651 msg.contains("integrity") || msg.contains("sha256"),
652 "error must mention integrity/sha256; got: {msg}"
653 );
654 }
655}