Skip to main content

reddb_server/storage/wal/
recovery.rs

1//! Point-in-Time Recovery (PITR) built on top of logical WAL segments.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use super::{
7    load_archived_change_records, load_backup_head, load_snapshot_manifest,
8    load_wal_segment_manifest,
9};
10use crate::replication::logical::{ApplyMode, ApplyOutcome, LogicalChangeApplier};
11use crate::storage::backend::{BackendError, RemoteBackend};
12use crate::storage::RedDB;
13
14/// A point to which the database can be restored.
15#[derive(Debug, Clone)]
16pub struct RestorePoint {
17    pub snapshot_id: u64,
18    pub snapshot_time: u64,
19    pub wal_segment_count: usize,
20    pub latest_recoverable_time: u64,
21}
22
23/// Result of a PITR operation.
24#[derive(Debug, Clone)]
25pub struct RecoveryResult {
26    pub snapshot_used: u64,
27    pub wal_segments_replayed: usize,
28    pub records_applied: u64,
29    pub recovered_to_lsn: u64,
30    pub recovered_to_time: u64,
31}
32
33#[derive(Debug, Clone)]
34pub struct RestorePlan {
35    pub timeline_id: String,
36    pub snapshot_key: String,
37    pub snapshot_id: u64,
38    pub snapshot_time: u64,
39    pub base_lsn: u64,
40    pub target_time: u64,
41    pub wal_segments: Vec<String>,
42    /// Hex-encoded SHA-256 of the snapshot bytes carried forward
43    /// from the manifest. `None` when the manifest predates the
44    /// checksum field (legacy backups). Restore-side verification
45    /// fails closed when the value is `Some` and the recomputed hash
46    /// doesn't match.
47    pub snapshot_sha256: Option<String>,
48}
49
50#[derive(Debug, Clone)]
51struct SnapshotDescriptor {
52    key: String,
53    snapshot_id: u64,
54    snapshot_time: u64,
55    timeline_id: String,
56    base_lsn: u64,
57    snapshot_sha256: Option<String>,
58}
59
60#[derive(Debug, Clone)]
61struct WalSegmentDescriptor {
62    key: String,
63    lsn_start: u64,
64    lsn_end: u64,
65}
66
67/// Point-in-Time Recovery engine.
68pub struct PointInTimeRecovery {
69    backend: Arc<dyn RemoteBackend>,
70    snapshot_prefix: String,
71    wal_prefix: String,
72}
73
74impl PointInTimeRecovery {
75    pub fn new(
76        backend: Arc<dyn RemoteBackend>,
77        snapshot_prefix: impl Into<String>,
78        wal_prefix: impl Into<String>,
79    ) -> Self {
80        Self {
81            backend,
82            snapshot_prefix: snapshot_prefix.into(),
83            wal_prefix: wal_prefix.into(),
84        }
85    }
86
87    pub fn plan_restore(&self, target_time: u64) -> Result<RestorePlan, BackendError> {
88        let snapshots = self.list_snapshots()?;
89        let selected = snapshots
90            .iter()
91            .filter(|snapshot| snapshot.snapshot_time <= target_time || target_time == 0)
92            .max_by_key(|snapshot| snapshot.snapshot_time)
93            .ok_or_else(|| {
94                BackendError::NotFound(format!(
95                    "no snapshot available at or before target timestamp {target_time}"
96                ))
97            })?;
98
99        let wal_segments = self
100            .list_wal_segments()?
101            .into_iter()
102            .filter(|segment| segment.lsn_end > selected.base_lsn)
103            .map(|segment| segment.key)
104            .collect();
105
106        Ok(RestorePlan {
107            timeline_id: selected.timeline_id.clone(),
108            snapshot_key: selected.key.clone(),
109            snapshot_id: selected.snapshot_id,
110            snapshot_time: selected.snapshot_time,
111            base_lsn: selected.base_lsn,
112            target_time,
113            wal_segments,
114            snapshot_sha256: selected.snapshot_sha256.clone(),
115        })
116    }
117
118    pub fn restore_to(
119        &self,
120        target_time: u64,
121        dest_path: &Path,
122    ) -> Result<RecoveryResult, BackendError> {
123        let plan = self.plan_restore(target_time)?;
124        self.execute_restore(&plan, dest_path)
125    }
126
127    pub fn execute_restore(
128        &self,
129        plan: &RestorePlan,
130        dest_path: &Path,
131    ) -> Result<RecoveryResult, BackendError> {
132        if let Some(parent) = dest_path.parent() {
133            std::fs::create_dir_all(parent).map_err(|err| {
134                BackendError::Transport(format!(
135                    "create restore destination directory failed: {err}"
136                ))
137            })?;
138        }
139
140        let downloaded = self.backend.download(&plan.snapshot_key, dest_path)?;
141        if !downloaded {
142            return Err(BackendError::NotFound(format!(
143                "snapshot '{}' disappeared during restore",
144                plan.snapshot_key
145            )));
146        }
147
148        // Snapshot integrity check (PLAN.md Phase 4 — restore validation).
149        //
150        // When the manifest carries a `snapshot_sha256`, recompute it
151        // against the downloaded file and refuse to open the database
152        // if they disagree. The downloaded file is left in place for
153        // operator forensics — they can re-run with `--ignore-checksum`
154        // (not yet implemented) once the corruption source is known.
155        //
156        // When the manifest predates the checksum field (`None`),
157        // proceed with a warning. Old backups stay restorable; new
158        // backups get fail-closed protection.
159        match &plan.snapshot_sha256 {
160            Some(expected) => {
161                let computed = reddb_file::SnapshotManifest::compute_snapshot_sha256(dest_path)
162                    .map_err(|err| {
163                        BackendError::Internal(format!(
164                            "snapshot integrity hash failed for '{}': {err}",
165                            plan.snapshot_key
166                        ))
167                    })?;
168                if !computed.eq_ignore_ascii_case(expected) {
169                    return Err(BackendError::Internal(format!(
170                        "snapshot integrity check failed for '{}': manifest sha256 {} != computed sha256 {}; \
171                         downloaded file kept at {} for forensics",
172                        plan.snapshot_key,
173                        expected,
174                        computed,
175                        dest_path.display(),
176                    )));
177                }
178            }
179            None => {
180                tracing::warn!(
181                    target: "reddb::backup::restore",
182                    snapshot_key = %plan.snapshot_key,
183                    "manifest predates snapshot_sha256 field; restore proceeding without integrity check"
184                );
185            }
186        }
187
188        let db = RedDB::open(dest_path).map_err(|err| {
189            BackendError::Internal(format!("open restore database failed: {err}"))
190        })?;
191
192        let mut wal_segments_replayed = 0usize;
193        let mut records_applied = 0u64;
194        let mut recovered_to_lsn = plan.base_lsn;
195        let mut recovered_to_time = plan.snapshot_time;
196
197        // PLAN.md Phase 11.5 — stateful applier so restore enforces the
198        // same LSN monotonicity guarantees a replica fetcher does.
199        // Starting LSN is the snapshot's base_lsn; first applied record
200        // must be `base_lsn + 1` (or any positive LSN if base_lsn == 0).
201        let applier = LogicalChangeApplier::new(plan.base_lsn);
202
203        // PLAN.md Phase 11.3 — track the previous segment's sha256 so
204        // every iteration can verify `segment[i].prev_hash == segment[i-1].sha256`.
205        // The first segment is allowed `prev_hash = None`; subsequent
206        // segments must link explicitly. A break aborts restore.
207        let mut prev_segment_sha: Option<String> = None;
208
209        for (segment_idx, segment_key) in plan.wal_segments.iter().enumerate() {
210            // PLAN.md Phase 2.4 — verify segment integrity via the
211            // sidecar manifest's sha256 before applying its records.
212            // Fail-closed parity with snapshot verification: a present-
213            // but-wrong digest aborts restore so we don't ingest a
214            // tampered tail; an absent manifest (legacy archive) logs
215            // a warning and proceeds.
216            let manifest = super::load_wal_segment_manifest(self.backend.as_ref(), segment_key)?;
217            let (records, computed_sha) =
218                super::archiver::load_archived_change_records_with_sha256(
219                    self.backend.as_ref(),
220                    segment_key,
221                )?;
222            match manifest.as_ref().and_then(|m| m.sha256.as_deref()) {
223                Some(expected) => match computed_sha.as_deref() {
224                    Some(actual) if actual.eq_ignore_ascii_case(expected) => {}
225                    Some(actual) => {
226                        return Err(BackendError::Internal(format!(
227                            "wal segment integrity check failed for '{segment_key}': \
228                             manifest sha256 {expected} != computed sha256 {actual}",
229                        )));
230                    }
231                    None => {
232                        return Err(BackendError::Internal(format!(
233                            "wal segment integrity check failed for '{segment_key}': \
234                             expected sha256 {expected} but segment was empty / unreadable",
235                        )));
236                    }
237                },
238                None => {
239                    tracing::warn!(
240                        target: "reddb::backup::restore",
241                        segment_key = %segment_key,
242                        "wal segment manifest absent or sha256-less; restore proceeding without integrity check"
243                    );
244                }
245            }
246
247            // PLAN.md Phase 11.3 — hash chain validation. The first
248            // segment in the restore plan may have `prev_hash = None`
249            // (fresh timeline). Every subsequent segment must point to
250            // the prior segment's sha256. A break detects:
251            //   * a segment removed from the middle (next segment's
252            //     prev_hash refers to the missing one)
253            //   * a tampered/replaced segment (prev_hash mismatches)
254            //   * reordering (prev_hash refers to a non-adjacent peer)
255            // Legacy segments (no manifest at all) skip the chain check
256            // with a warning, same as the sha256 case.
257            if let Some(m) = manifest.as_ref() {
258                match (&m.prev_hash, &prev_segment_sha) {
259                    (Some(declared), Some(actual)) => {
260                        if !declared.eq_ignore_ascii_case(actual) {
261                            return Err(BackendError::Internal(format!(
262                                "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
263                                 declared prev_hash {declared} != prior segment sha256 {actual}; \
264                                 a segment was removed, reordered, or replaced",
265                            )));
266                        }
267                    }
268                    (Some(declared), None) => {
269                        return Err(BackendError::Internal(format!(
270                            "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
271                             segment declares prev_hash {declared} but no prior segment was loaded; \
272                             the first segment of the chain is missing",
273                        )));
274                    }
275                    (None, Some(actual)) => {
276                        return Err(BackendError::Internal(format!(
277                            "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
278                             segment claims to be the first in its timeline but a prior segment \
279                             (sha256 {actual}) was already replayed; reorder or merge of two timelines",
280                        )));
281                    }
282                    (None, None) => {
283                        // First segment of a fresh timeline — accepted.
284                    }
285                }
286                // Advance the chain head only when this segment carries
287                // a sha256. Without one we can't link the chain forward;
288                // keep the previous head so a later segment can still
289                // bridge over the gap (legacy archives mid-chain).
290                if let Some(sha) = m.sha256.clone() {
291                    prev_segment_sha = Some(sha);
292                }
293            } else {
294                // No manifest at all — already warned above. Reset
295                // chain tracking conservatively: the next segment
296                // can't reasonably claim a prev_hash if we don't know
297                // this one's sha256.
298                prev_segment_sha = None;
299            }
300
301            let mut segment_applied = false;
302            for record in records {
303                if record.lsn <= plan.base_lsn {
304                    continue;
305                }
306                if plan.target_time != 0 && record.timestamp > plan.target_time {
307                    continue;
308                }
309                match applier.apply(&db, &record, ApplyMode::Restore) {
310                    Ok(ApplyOutcome::Applied) => {
311                        recovered_to_lsn = recovered_to_lsn.max(record.lsn);
312                        recovered_to_time = recovered_to_time.max(record.timestamp);
313                        records_applied += 1;
314                        segment_applied = true;
315                    }
316                    Ok(ApplyOutcome::Idempotent) | Ok(ApplyOutcome::Skipped) => {}
317                    Err(err) => {
318                        return Err(BackendError::Internal(format!(
319                            "restore apply failed at lsn {} in segment '{}': {}",
320                            record.lsn, segment_key, err
321                        )));
322                    }
323                }
324            }
325            if segment_applied {
326                wal_segments_replayed += 1;
327            }
328        }
329
330        db.flush().map_err(|err| {
331            BackendError::Internal(format!("flush restored database failed: {err}"))
332        })?;
333
334        Ok(RecoveryResult {
335            snapshot_used: plan.snapshot_id,
336            wal_segments_replayed,
337            records_applied,
338            recovered_to_lsn,
339            recovered_to_time,
340        })
341    }
342
343    pub fn list_restore_points(&self) -> Result<Vec<RestorePoint>, BackendError> {
344        let snapshots = self.list_snapshots()?;
345        let wal_segments = self.list_wal_segments()?;
346        let mut out = Vec::new();
347
348        for snapshot in snapshots {
349            let wal_segment_count = wal_segments
350                .iter()
351                .filter(|segment| segment.lsn_end > snapshot.base_lsn)
352                .count();
353            out.push(RestorePoint {
354                snapshot_id: snapshot.snapshot_id,
355                snapshot_time: snapshot.snapshot_time,
356                wal_segment_count,
357                latest_recoverable_time: snapshot.snapshot_time,
358            });
359        }
360
361        out.sort_by_key(|point| point.snapshot_time);
362        Ok(out)
363    }
364
365    fn list_snapshots(&self) -> Result<Vec<SnapshotDescriptor>, BackendError> {
366        let snapshots = self.backend.list(&self.snapshot_prefix)?;
367        let mut out = Vec::new();
368        for key in snapshots {
369            let Some((snapshot_id, snapshot_time)) = reddb_file::parse_archived_snapshot_key(&key)
370            else {
371                continue;
372            };
373            let manifest = load_snapshot_manifest(self.backend.as_ref(), &key)?;
374            let (snapshot_id, snapshot_time, timeline_id, base_lsn, snapshot_sha256) =
375                if let Some(manifest) = manifest {
376                    (
377                        manifest.snapshot_id,
378                        manifest.snapshot_time,
379                        manifest.timeline_id,
380                        manifest.base_lsn,
381                        manifest.snapshot_sha256,
382                    )
383                } else {
384                    let (timeline_id, base_lsn) = self
385                        .load_current_head()
386                        .filter(|head| head.snapshot_id == snapshot_id)
387                        .map(|head| (head.timeline_id, head.current_lsn))
388                        .unwrap_or_else(|| ("main".to_string(), 0));
389                    (snapshot_id, snapshot_time, timeline_id, base_lsn, None)
390                };
391
392            out.push(SnapshotDescriptor {
393                key,
394                snapshot_id,
395                snapshot_time,
396                timeline_id,
397                base_lsn,
398                snapshot_sha256,
399            });
400        }
401        out.sort_by_key(|snapshot| snapshot.snapshot_time);
402        Ok(out)
403    }
404
405    fn list_wal_segments(&self) -> Result<Vec<WalSegmentDescriptor>, BackendError> {
406        let keys = self.backend.list(&self.wal_prefix)?;
407        let mut out = Vec::new();
408        for key in keys {
409            let Some((lsn_start, lsn_end)) = reddb_file::parse_archived_wal_segment_key(&key)
410            else {
411                continue;
412            };
413            out.push(WalSegmentDescriptor {
414                key,
415                lsn_start,
416                lsn_end,
417            });
418        }
419        out.sort_by_key(|segment| segment.lsn_start);
420        Ok(out)
421    }
422
423    fn load_current_head(&self) -> Option<reddb_file::BackupHead> {
424        let root = reddb_file::backup_root_from_snapshot_prefix(&self.snapshot_prefix);
425        let head_key = reddb_file::backup_head_key(&root);
426        load_backup_head(self.backend.as_ref(), &head_key)
427            .ok()
428            .flatten()
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use crate::storage::backend::LocalBackend;
436    use crate::storage::wal::publish_snapshot_manifest;
437
438    #[test]
439    fn restore_to_downloads_latest_snapshot_before_target() {
440        let temp_dir =
441            std::env::temp_dir().join(format!("reddb_pitr_restore_{}_{}", std::process::id(), 1));
442        let snapshot_dir = reddb_file::backup_snapshot_dir(&temp_dir);
443        let restore_path = temp_dir.join("restore").join("data.rdb");
444        let _ = std::fs::remove_dir_all(&temp_dir);
445        std::fs::create_dir_all(&snapshot_dir).unwrap();
446
447        let snapshot1 = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
448            &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
449            1,
450            100,
451        ));
452        let snapshot2 = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
453            &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
454            2,
455            200,
456        ));
457        RedDB::open(&snapshot1).unwrap().flush().unwrap();
458        RedDB::open(&snapshot2).unwrap().flush().unwrap();
459        publish_snapshot_manifest(
460            &LocalBackend,
461            &reddb_file::SnapshotManifest {
462                timeline_id: "main".to_string(),
463                snapshot_key: snapshot1.to_string_lossy().to_string(),
464                snapshot_id: 1,
465                snapshot_time: 100,
466                base_lsn: 0,
467                schema_version: crate::api::REDDB_FORMAT_VERSION,
468                format_version: crate::api::REDDB_FORMAT_VERSION,
469                snapshot_sha256: None,
470            },
471        )
472        .unwrap();
473        publish_snapshot_manifest(
474            &LocalBackend,
475            &reddb_file::SnapshotManifest {
476                timeline_id: "main".to_string(),
477                snapshot_key: snapshot2.to_string_lossy().to_string(),
478                snapshot_id: 2,
479                snapshot_time: 200,
480                base_lsn: 0,
481                schema_version: crate::api::REDDB_FORMAT_VERSION,
482                format_version: crate::api::REDDB_FORMAT_VERSION,
483                snapshot_sha256: None,
484            },
485        )
486        .unwrap();
487
488        let recovery = PointInTimeRecovery::new(
489            Arc::new(LocalBackend),
490            snapshot_dir.to_string_lossy().to_string(),
491            reddb_file::backup_wal_dir(&temp_dir)
492                .to_string_lossy()
493                .to_string(),
494        );
495
496        let result = recovery.restore_to(150, &restore_path).unwrap();
497        assert_eq!(result.snapshot_used, 1);
498        assert_eq!(result.recovered_to_time, 100);
499        assert!(restore_path.exists());
500
501        let _ = std::fs::remove_dir_all(&temp_dir);
502    }
503
504    /// Helper: build a snapshot+wal layout, archive `n` segments
505    /// linked by sha256 prev_hash, then run the restore loop and
506    /// return its result. Lets the chain tests share boilerplate.
507    fn run_chain_restore(
508        tag: &str,
509        mutate: impl FnOnce(&LocalBackend, &[reddb_file::WalSegmentMeta]),
510    ) -> Result<RecoveryResult, BackendError> {
511        use crate::replication::cdc::{change_record_from_entity, ChangeRecord};
512        use crate::storage::schema::Value;
513        use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
514        let temp_dir = std::env::temp_dir().join(format!(
515            "reddb_chain_{}_{}_{}",
516            tag,
517            std::process::id(),
518            std::time::SystemTime::now()
519                .duration_since(std::time::UNIX_EPOCH)
520                .unwrap()
521                .as_nanos()
522        ));
523        let _ = std::fs::remove_dir_all(&temp_dir);
524        let snapshot_dir = reddb_file::backup_snapshot_dir(&temp_dir);
525        let wal_dir = reddb_file::backup_wal_dir(&temp_dir);
526        let restore_path = temp_dir.join("restore").join("data.rdb");
527        std::fs::create_dir_all(&snapshot_dir).unwrap();
528        std::fs::create_dir_all(&wal_dir).unwrap();
529
530        let snapshot_path = std::path::PathBuf::from(reddb_file::archived_snapshot_key(
531            &reddb_file::backup_snapshot_dir_prefix(&temp_dir),
532            1,
533            100,
534        ));
535        RedDB::open(&snapshot_path).unwrap().flush().unwrap();
536        publish_snapshot_manifest(
537            &LocalBackend,
538            &reddb_file::SnapshotManifest {
539                timeline_id: "main".to_string(),
540                snapshot_key: snapshot_path.to_string_lossy().to_string(),
541                snapshot_id: 1,
542                snapshot_time: 100,
543                base_lsn: 0,
544                schema_version: crate::api::REDDB_FORMAT_VERSION,
545                format_version: crate::api::REDDB_FORMAT_VERSION,
546                snapshot_sha256: None,
547            },
548        )
549        .unwrap();
550
551        let wal_prefix = reddb_file::backup_wal_dir_prefix(&temp_dir);
552        let mk = |lsn: u64| {
553            let timestamp = 100 + lsn;
554            let mut entity = UnifiedEntity::new(
555                EntityId::new(lsn),
556                EntityKind::TableRow {
557                    table: Arc::from("users"),
558                    row_id: lsn,
559                },
560                EntityData::Row(RowData::with_names(
561                    vec![
562                        Value::UnsignedInteger(lsn),
563                        Value::Text(format!("payload-{lsn}").into()),
564                    ],
565                    vec!["id".to_string(), "payload".to_string()],
566                )),
567            );
568            entity.created_at = timestamp;
569            entity.updated_at = timestamp;
570            entity.sequence_id = lsn;
571            change_record_from_entity(
572                lsn,
573                timestamp,
574                crate::replication::cdc::ChangeOperation::Insert,
575                "users",
576                "row",
577                &entity,
578                crate::api::REDDB_FORMAT_VERSION,
579                None,
580            )
581        };
582
583        let mut metas = Vec::new();
584        let mut prev: Option<String> = None;
585        for lsn in [1u64, 2, 3] {
586            let r = mk(lsn);
587            let m = crate::storage::wal::archive_change_records(
588                &LocalBackend,
589                &wal_prefix,
590                &[(r.lsn, r.encode())],
591                prev.clone(),
592            )
593            .unwrap()
594            .expect("archived");
595            prev = m.sha256.clone();
596            metas.push(m);
597        }
598
599        mutate(&LocalBackend, &metas);
600
601        let recovery = PointInTimeRecovery::new(
602            Arc::new(LocalBackend),
603            snapshot_dir.to_string_lossy().to_string(),
604            wal_prefix,
605        );
606        let result = recovery.restore_to(0, &restore_path);
607        let _ = std::fs::remove_dir_all(&temp_dir);
608        result
609    }
610
611    #[test]
612    fn restore_succeeds_with_intact_chain() {
613        let result = run_chain_restore("intact", |_, _| {});
614        let r = result.expect("intact chain restore must succeed");
615        assert_eq!(r.wal_segments_replayed, 3);
616    }
617
618    #[test]
619    fn restore_fails_closed_on_chain_break() {
620        // Corrupt segment 2's sidecar manifest by overwriting the
621        // declared prev_hash with a value that doesn't match segment 1.
622        let result = run_chain_restore("chainbreak", |backend, metas| {
623            let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
624                .unwrap()
625                .unwrap();
626            bad.prev_hash = Some("00".repeat(32));
627            crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
628        });
629        let err = result.expect_err("chain break must fail closed");
630        let msg = err.to_string();
631        assert!(
632            msg.contains("chain"),
633            "error must mention chain; got: {msg}"
634        );
635    }
636
637    #[test]
638    fn restore_fails_closed_on_sha256_corruption() {
639        // Tamper segment 2's sidecar sha256 so the integrity check
640        // (Phase 2.4) fires before the chain check (Phase 11.3).
641        let result = run_chain_restore("shacorrupt", |backend, metas| {
642            let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
643                .unwrap()
644                .unwrap();
645            bad.sha256 = Some("ff".repeat(32));
646            crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
647        });
648        let err = result.expect_err("sha mismatch must fail closed");
649        let msg = err.to_string();
650        assert!(
651            msg.contains("integrity") || msg.contains("sha256"),
652            "error must mention integrity/sha256; got: {msg}"
653        );
654    }
655}