Skip to main content

reddb_server/storage/wal/
recovery.rs

1//! Point-in-Time Recovery (PITR) built on top of logical WAL segments.
2
3use std::path::Path;
4use std::sync::Arc;
5
6use super::{
7    load_archived_change_records, load_backup_head, load_snapshot_manifest,
8    load_wal_segment_manifest,
9};
10use crate::replication::logical::{ApplyMode, ApplyOutcome, LogicalChangeApplier};
11use crate::storage::backend::{BackendError, RemoteBackend};
12use crate::storage::RedDB;
13
14/// A point to which the database can be restored.
15#[derive(Debug, Clone)]
16pub struct RestorePoint {
17    pub snapshot_id: u64,
18    pub snapshot_time: u64,
19    pub wal_segment_count: usize,
20    pub latest_recoverable_time: u64,
21}
22
23/// Result of a PITR operation.
24#[derive(Debug, Clone)]
25pub struct RecoveryResult {
26    pub snapshot_used: u64,
27    pub wal_segments_replayed: usize,
28    pub records_applied: u64,
29    pub recovered_to_lsn: u64,
30    pub recovered_to_time: u64,
31}
32
33#[derive(Debug, Clone)]
34pub struct RestorePlan {
35    pub timeline_id: String,
36    pub snapshot_key: String,
37    pub snapshot_id: u64,
38    pub snapshot_time: u64,
39    pub base_lsn: u64,
40    pub target_time: u64,
41    pub wal_segments: Vec<String>,
42    /// Hex-encoded SHA-256 of the snapshot bytes carried forward
43    /// from the manifest. `None` when the manifest predates the
44    /// checksum field (legacy backups). Restore-side verification
45    /// fails closed when the value is `Some` and the recomputed hash
46    /// doesn't match.
47    pub snapshot_sha256: Option<String>,
48}
49
50#[derive(Debug, Clone)]
51struct SnapshotDescriptor {
52    key: String,
53    snapshot_id: u64,
54    snapshot_time: u64,
55    timeline_id: String,
56    base_lsn: u64,
57    snapshot_sha256: Option<String>,
58}
59
60#[derive(Debug, Clone)]
61struct WalSegmentDescriptor {
62    key: String,
63    lsn_start: u64,
64    lsn_end: u64,
65}
66
67/// Point-in-Time Recovery engine.
68pub struct PointInTimeRecovery {
69    backend: Arc<dyn RemoteBackend>,
70    snapshot_prefix: String,
71    wal_prefix: String,
72}
73
74impl PointInTimeRecovery {
75    pub fn new(
76        backend: Arc<dyn RemoteBackend>,
77        snapshot_prefix: impl Into<String>,
78        wal_prefix: impl Into<String>,
79    ) -> Self {
80        Self {
81            backend,
82            snapshot_prefix: snapshot_prefix.into(),
83            wal_prefix: wal_prefix.into(),
84        }
85    }
86
87    pub fn plan_restore(&self, target_time: u64) -> Result<RestorePlan, BackendError> {
88        let snapshots = self.list_snapshots()?;
89        let selected = snapshots
90            .iter()
91            .filter(|snapshot| snapshot.snapshot_time <= target_time || target_time == 0)
92            .max_by_key(|snapshot| snapshot.snapshot_time)
93            .ok_or_else(|| {
94                BackendError::NotFound(format!(
95                    "no snapshot available at or before target timestamp {target_time}"
96                ))
97            })?;
98
99        let wal_segments = self
100            .list_wal_segments()?
101            .into_iter()
102            .filter(|segment| segment.lsn_end > selected.base_lsn)
103            .map(|segment| segment.key)
104            .collect();
105
106        Ok(RestorePlan {
107            timeline_id: selected.timeline_id.clone(),
108            snapshot_key: selected.key.clone(),
109            snapshot_id: selected.snapshot_id,
110            snapshot_time: selected.snapshot_time,
111            base_lsn: selected.base_lsn,
112            target_time,
113            wal_segments,
114            snapshot_sha256: selected.snapshot_sha256.clone(),
115        })
116    }
117
118    pub fn restore_to(
119        &self,
120        target_time: u64,
121        dest_path: &Path,
122    ) -> Result<RecoveryResult, BackendError> {
123        let plan = self.plan_restore(target_time)?;
124        self.execute_restore(&plan, dest_path)
125    }
126
127    pub fn execute_restore(
128        &self,
129        plan: &RestorePlan,
130        dest_path: &Path,
131    ) -> Result<RecoveryResult, BackendError> {
132        if let Some(parent) = dest_path.parent() {
133            std::fs::create_dir_all(parent).map_err(|err| {
134                BackendError::Transport(format!(
135                    "create restore destination directory failed: {err}"
136                ))
137            })?;
138        }
139
140        let downloaded = self.backend.download(&plan.snapshot_key, dest_path)?;
141        if !downloaded {
142            return Err(BackendError::NotFound(format!(
143                "snapshot '{}' disappeared during restore",
144                plan.snapshot_key
145            )));
146        }
147
148        // Snapshot integrity check (PLAN.md Phase 4 — restore validation).
149        //
150        // When the manifest carries a `snapshot_sha256`, recompute it
151        // against the downloaded file and refuse to open the database
152        // if they disagree. The downloaded file is left in place for
153        // operator forensics — they can re-run with `--ignore-checksum`
154        // (not yet implemented) once the corruption source is known.
155        //
156        // When the manifest predates the checksum field (`None`),
157        // proceed with a warning. Old backups stay restorable; new
158        // backups get fail-closed protection.
159        match &plan.snapshot_sha256 {
160            Some(expected) => {
161                let computed =
162                    crate::storage::wal::SnapshotManifest::compute_snapshot_sha256(dest_path)?;
163                if !computed.eq_ignore_ascii_case(expected) {
164                    return Err(BackendError::Internal(format!(
165                        "snapshot integrity check failed for '{}': manifest sha256 {} != computed sha256 {}; \
166                         downloaded file kept at {} for forensics",
167                        plan.snapshot_key,
168                        expected,
169                        computed,
170                        dest_path.display(),
171                    )));
172                }
173            }
174            None => {
175                tracing::warn!(
176                    target: "reddb::backup::restore",
177                    snapshot_key = %plan.snapshot_key,
178                    "manifest predates snapshot_sha256 field; restore proceeding without integrity check"
179                );
180            }
181        }
182
183        let db = RedDB::open(dest_path).map_err(|err| {
184            BackendError::Internal(format!("open restore database failed: {err}"))
185        })?;
186
187        let mut wal_segments_replayed = 0usize;
188        let mut records_applied = 0u64;
189        let mut recovered_to_lsn = plan.base_lsn;
190        let mut recovered_to_time = plan.snapshot_time;
191
192        // PLAN.md Phase 11.5 — stateful applier so restore enforces the
193        // same LSN monotonicity guarantees a replica fetcher does.
194        // Starting LSN is the snapshot's base_lsn; first applied record
195        // must be `base_lsn + 1` (or any positive LSN if base_lsn == 0).
196        let applier = LogicalChangeApplier::new(plan.base_lsn);
197
198        // PLAN.md Phase 11.3 — track the previous segment's sha256 so
199        // every iteration can verify `segment[i].prev_hash == segment[i-1].sha256`.
200        // The first segment is allowed `prev_hash = None`; subsequent
201        // segments must link explicitly. A break aborts restore.
202        let mut prev_segment_sha: Option<String> = None;
203
204        for (segment_idx, segment_key) in plan.wal_segments.iter().enumerate() {
205            // PLAN.md Phase 2.4 — verify segment integrity via the
206            // sidecar manifest's sha256 before applying its records.
207            // Fail-closed parity with snapshot verification: a present-
208            // but-wrong digest aborts restore so we don't ingest a
209            // tampered tail; an absent manifest (legacy archive) logs
210            // a warning and proceeds.
211            let manifest = super::load_wal_segment_manifest(self.backend.as_ref(), segment_key)?;
212            let (records, computed_sha) =
213                super::archiver::load_archived_change_records_with_sha256(
214                    self.backend.as_ref(),
215                    segment_key,
216                )?;
217            match manifest.as_ref().and_then(|m| m.sha256.as_deref()) {
218                Some(expected) => match computed_sha.as_deref() {
219                    Some(actual) if actual.eq_ignore_ascii_case(expected) => {}
220                    Some(actual) => {
221                        return Err(BackendError::Internal(format!(
222                            "wal segment integrity check failed for '{segment_key}': \
223                             manifest sha256 {expected} != computed sha256 {actual}",
224                        )));
225                    }
226                    None => {
227                        return Err(BackendError::Internal(format!(
228                            "wal segment integrity check failed for '{segment_key}': \
229                             expected sha256 {expected} but segment was empty / unreadable",
230                        )));
231                    }
232                },
233                None => {
234                    tracing::warn!(
235                        target: "reddb::backup::restore",
236                        segment_key = %segment_key,
237                        "wal segment manifest absent or sha256-less; restore proceeding without integrity check"
238                    );
239                }
240            }
241
242            // PLAN.md Phase 11.3 — hash chain validation. The first
243            // segment in the restore plan may have `prev_hash = None`
244            // (fresh timeline). Every subsequent segment must point to
245            // the prior segment's sha256. A break detects:
246            //   * a segment removed from the middle (next segment's
247            //     prev_hash refers to the missing one)
248            //   * a tampered/replaced segment (prev_hash mismatches)
249            //   * reordering (prev_hash refers to a non-adjacent peer)
250            // Legacy segments (no manifest at all) skip the chain check
251            // with a warning, same as the sha256 case.
252            if let Some(m) = manifest.as_ref() {
253                match (&m.prev_hash, &prev_segment_sha) {
254                    (Some(declared), Some(actual)) => {
255                        if !declared.eq_ignore_ascii_case(actual) {
256                            return Err(BackendError::Internal(format!(
257                                "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
258                                 declared prev_hash {declared} != prior segment sha256 {actual}; \
259                                 a segment was removed, reordered, or replaced",
260                            )));
261                        }
262                    }
263                    (Some(declared), None) => {
264                        return Err(BackendError::Internal(format!(
265                            "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
266                             segment declares prev_hash {declared} but no prior segment was loaded; \
267                             the first segment of the chain is missing",
268                        )));
269                    }
270                    (None, Some(actual)) => {
271                        return Err(BackendError::Internal(format!(
272                            "wal segment hash chain broken at '{segment_key}' (index {segment_idx}): \
273                             segment claims to be the first in its timeline but a prior segment \
274                             (sha256 {actual}) was already replayed; reorder or merge of two timelines",
275                        )));
276                    }
277                    (None, None) => {
278                        // First segment of a fresh timeline — accepted.
279                    }
280                }
281                // Advance the chain head only when this segment carries
282                // a sha256. Without one we can't link the chain forward;
283                // keep the previous head so a later segment can still
284                // bridge over the gap (legacy archives mid-chain).
285                if let Some(sha) = m.sha256.clone() {
286                    prev_segment_sha = Some(sha);
287                }
288            } else {
289                // No manifest at all — already warned above. Reset
290                // chain tracking conservatively: the next segment
291                // can't reasonably claim a prev_hash if we don't know
292                // this one's sha256.
293                prev_segment_sha = None;
294            }
295
296            let mut segment_applied = false;
297            for record in records {
298                if record.lsn <= plan.base_lsn {
299                    continue;
300                }
301                if plan.target_time != 0 && record.timestamp > plan.target_time {
302                    continue;
303                }
304                match applier.apply(&db, &record, ApplyMode::Restore) {
305                    Ok(ApplyOutcome::Applied) => {
306                        recovered_to_lsn = recovered_to_lsn.max(record.lsn);
307                        recovered_to_time = recovered_to_time.max(record.timestamp);
308                        records_applied += 1;
309                        segment_applied = true;
310                    }
311                    Ok(ApplyOutcome::Idempotent) | Ok(ApplyOutcome::Skipped) => {}
312                    Err(err) => {
313                        return Err(BackendError::Internal(format!(
314                            "restore apply failed at lsn {} in segment '{}': {}",
315                            record.lsn, segment_key, err
316                        )));
317                    }
318                }
319            }
320            if segment_applied {
321                wal_segments_replayed += 1;
322            }
323        }
324
325        db.flush().map_err(|err| {
326            BackendError::Internal(format!("flush restored database failed: {err}"))
327        })?;
328
329        Ok(RecoveryResult {
330            snapshot_used: plan.snapshot_id,
331            wal_segments_replayed,
332            records_applied,
333            recovered_to_lsn,
334            recovered_to_time,
335        })
336    }
337
338    pub fn list_restore_points(&self) -> Result<Vec<RestorePoint>, BackendError> {
339        let snapshots = self.list_snapshots()?;
340        let wal_segments = self.list_wal_segments()?;
341        let mut out = Vec::new();
342
343        for snapshot in snapshots {
344            let wal_segment_count = wal_segments
345                .iter()
346                .filter(|segment| segment.lsn_end > snapshot.base_lsn)
347                .count();
348            out.push(RestorePoint {
349                snapshot_id: snapshot.snapshot_id,
350                snapshot_time: snapshot.snapshot_time,
351                wal_segment_count,
352                latest_recoverable_time: snapshot.snapshot_time,
353            });
354        }
355
356        out.sort_by_key(|point| point.snapshot_time);
357        Ok(out)
358    }
359
360    fn list_snapshots(&self) -> Result<Vec<SnapshotDescriptor>, BackendError> {
361        let snapshots = self.backend.list(&self.snapshot_prefix)?;
362        let mut out = Vec::new();
363        for key in snapshots {
364            let Some(file_name) = std::path::Path::new(&key)
365                .file_name()
366                .and_then(|s| s.to_str())
367            else {
368                continue;
369            };
370            let Some(base) = file_name.strip_suffix(".snapshot") else {
371                continue;
372            };
373            let Some((snapshot_id, snapshot_time)) = base.split_once('-') else {
374                continue;
375            };
376            let (Ok(snapshot_id), Ok(snapshot_time)) =
377                (snapshot_id.parse::<u64>(), snapshot_time.parse::<u64>())
378            else {
379                continue;
380            };
381            let manifest = load_snapshot_manifest(self.backend.as_ref(), &key)?;
382            let (snapshot_id, snapshot_time, timeline_id, base_lsn, snapshot_sha256) =
383                if let Some(manifest) = manifest {
384                    (
385                        manifest.snapshot_id,
386                        manifest.snapshot_time,
387                        manifest.timeline_id,
388                        manifest.base_lsn,
389                        manifest.snapshot_sha256,
390                    )
391                } else {
392                    let (timeline_id, base_lsn) = self
393                        .load_current_head()
394                        .filter(|head| head.snapshot_id == snapshot_id)
395                        .map(|head| (head.timeline_id, head.current_lsn))
396                        .unwrap_or_else(|| ("main".to_string(), 0));
397                    (snapshot_id, snapshot_time, timeline_id, base_lsn, None)
398                };
399
400            out.push(SnapshotDescriptor {
401                key,
402                snapshot_id,
403                snapshot_time,
404                timeline_id,
405                base_lsn,
406                snapshot_sha256,
407            });
408        }
409        out.sort_by_key(|snapshot| snapshot.snapshot_time);
410        Ok(out)
411    }
412
413    fn list_wal_segments(&self) -> Result<Vec<WalSegmentDescriptor>, BackendError> {
414        let keys = self.backend.list(&self.wal_prefix)?;
415        let mut out = Vec::new();
416        for key in keys {
417            let Some(file_name) = std::path::Path::new(&key)
418                .file_name()
419                .and_then(|s| s.to_str())
420            else {
421                continue;
422            };
423            let Some((start, end)) = file_name
424                .strip_suffix(".wal")
425                .and_then(|base| base.split_once('-'))
426            else {
427                continue;
428            };
429            let (Ok(lsn_start), Ok(lsn_end)) = (start.parse::<u64>(), end.parse::<u64>()) else {
430                continue;
431            };
432            out.push(WalSegmentDescriptor {
433                key,
434                lsn_start,
435                lsn_end,
436            });
437        }
438        out.sort_by_key(|segment| segment.lsn_start);
439        Ok(out)
440    }
441
442    fn load_current_head(&self) -> Option<super::BackupHead> {
443        let snapshot_root = self.snapshot_prefix.trim_end_matches('/');
444        let parent = Path::new(snapshot_root).parent()?;
445        let parent = parent.to_string_lossy().trim_end_matches('/').to_string();
446        let head_key = if parent.is_empty() {
447            "manifests/head.json".to_string()
448        } else {
449            format!("{parent}/manifests/head.json")
450        };
451        load_backup_head(self.backend.as_ref(), &head_key)
452            .ok()
453            .flatten()
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::storage::backend::LocalBackend;
461    use crate::storage::wal::{publish_snapshot_manifest, SnapshotManifest};
462
463    #[test]
464    fn restore_to_downloads_latest_snapshot_before_target() {
465        let temp_dir =
466            std::env::temp_dir().join(format!("reddb_pitr_restore_{}_{}", std::process::id(), 1));
467        let snapshot_dir = temp_dir.join("snapshots");
468        let restore_path = temp_dir.join("restore").join("data.rdb");
469        let _ = std::fs::remove_dir_all(&temp_dir);
470        std::fs::create_dir_all(&snapshot_dir).unwrap();
471
472        let snapshot1 = snapshot_dir.join("1-100.snapshot");
473        let snapshot2 = snapshot_dir.join("2-200.snapshot");
474        RedDB::open(&snapshot1).unwrap().flush().unwrap();
475        RedDB::open(&snapshot2).unwrap().flush().unwrap();
476        publish_snapshot_manifest(
477            &LocalBackend,
478            &SnapshotManifest {
479                timeline_id: "main".to_string(),
480                snapshot_key: snapshot1.to_string_lossy().to_string(),
481                snapshot_id: 1,
482                snapshot_time: 100,
483                base_lsn: 0,
484                schema_version: crate::api::REDDB_FORMAT_VERSION,
485                format_version: crate::api::REDDB_FORMAT_VERSION,
486                snapshot_sha256: None,
487            },
488        )
489        .unwrap();
490        publish_snapshot_manifest(
491            &LocalBackend,
492            &SnapshotManifest {
493                timeline_id: "main".to_string(),
494                snapshot_key: snapshot2.to_string_lossy().to_string(),
495                snapshot_id: 2,
496                snapshot_time: 200,
497                base_lsn: 0,
498                schema_version: crate::api::REDDB_FORMAT_VERSION,
499                format_version: crate::api::REDDB_FORMAT_VERSION,
500                snapshot_sha256: None,
501            },
502        )
503        .unwrap();
504
505        let recovery = PointInTimeRecovery::new(
506            Arc::new(LocalBackend),
507            snapshot_dir.to_string_lossy().to_string(),
508            temp_dir.join("wal").to_string_lossy().to_string(),
509        );
510
511        let result = recovery.restore_to(150, &restore_path).unwrap();
512        assert_eq!(result.snapshot_used, 1);
513        assert_eq!(result.recovered_to_time, 100);
514        assert!(restore_path.exists());
515
516        let _ = std::fs::remove_dir_all(&temp_dir);
517    }
518
519    /// Helper: build a snapshot+wal layout, archive `n` segments
520    /// linked by sha256 prev_hash, then run the restore loop and
521    /// return its result. Lets the chain tests share boilerplate.
522    fn run_chain_restore(
523        tag: &str,
524        mutate: impl FnOnce(&LocalBackend, &[crate::storage::wal::WalSegmentMeta]),
525    ) -> Result<RecoveryResult, BackendError> {
526        use crate::replication::cdc::ChangeRecord;
527        use crate::storage::schema::Value;
528        use crate::storage::{EntityData, EntityId, EntityKind, RowData, UnifiedEntity};
529        let temp_dir = std::env::temp_dir().join(format!(
530            "reddb_chain_{}_{}_{}",
531            tag,
532            std::process::id(),
533            std::time::SystemTime::now()
534                .duration_since(std::time::UNIX_EPOCH)
535                .unwrap()
536                .as_nanos()
537        ));
538        let _ = std::fs::remove_dir_all(&temp_dir);
539        let snapshot_dir = temp_dir.join("snapshots");
540        let wal_dir = temp_dir.join("wal");
541        let restore_path = temp_dir.join("restore").join("data.rdb");
542        std::fs::create_dir_all(&snapshot_dir).unwrap();
543        std::fs::create_dir_all(&wal_dir).unwrap();
544
545        let snapshot_path = snapshot_dir.join("1-100.snapshot");
546        RedDB::open(&snapshot_path).unwrap().flush().unwrap();
547        publish_snapshot_manifest(
548            &LocalBackend,
549            &SnapshotManifest {
550                timeline_id: "main".to_string(),
551                snapshot_key: snapshot_path.to_string_lossy().to_string(),
552                snapshot_id: 1,
553                snapshot_time: 100,
554                base_lsn: 0,
555                schema_version: crate::api::REDDB_FORMAT_VERSION,
556                format_version: crate::api::REDDB_FORMAT_VERSION,
557                snapshot_sha256: None,
558            },
559        )
560        .unwrap();
561
562        let wal_prefix = format!("{}/", wal_dir.to_string_lossy());
563        let mk = |lsn: u64| {
564            let timestamp = 100 + lsn;
565            let mut entity = UnifiedEntity::new(
566                EntityId::new(lsn),
567                EntityKind::TableRow {
568                    table: Arc::from("users"),
569                    row_id: lsn,
570                },
571                EntityData::Row(RowData::with_names(
572                    vec![
573                        Value::UnsignedInteger(lsn),
574                        Value::Text(format!("payload-{lsn}").into()),
575                    ],
576                    vec!["id".to_string(), "payload".to_string()],
577                )),
578            );
579            entity.created_at = timestamp;
580            entity.updated_at = timestamp;
581            entity.sequence_id = lsn;
582            ChangeRecord::from_entity(
583                lsn,
584                timestamp,
585                crate::replication::cdc::ChangeOperation::Insert,
586                "users",
587                "row",
588                &entity,
589                crate::api::REDDB_FORMAT_VERSION,
590                None,
591            )
592        };
593
594        let mut metas = Vec::new();
595        let mut prev: Option<String> = None;
596        for lsn in [1u64, 2, 3] {
597            let r = mk(lsn);
598            let m = crate::storage::wal::archive_change_records(
599                &LocalBackend,
600                &wal_prefix,
601                &[(r.lsn, r.encode())],
602                prev.clone(),
603            )
604            .unwrap()
605            .expect("archived");
606            prev = m.sha256.clone();
607            metas.push(m);
608        }
609
610        mutate(&LocalBackend, &metas);
611
612        let recovery = PointInTimeRecovery::new(
613            Arc::new(LocalBackend),
614            snapshot_dir.to_string_lossy().to_string(),
615            wal_prefix,
616        );
617        let result = recovery.restore_to(0, &restore_path);
618        let _ = std::fs::remove_dir_all(&temp_dir);
619        result
620    }
621
622    #[test]
623    fn restore_succeeds_with_intact_chain() {
624        let result = run_chain_restore("intact", |_, _| {});
625        let r = result.expect("intact chain restore must succeed");
626        assert_eq!(r.wal_segments_replayed, 3);
627    }
628
629    #[test]
630    fn restore_fails_closed_on_chain_break() {
631        // Corrupt segment 2's sidecar manifest by overwriting the
632        // declared prev_hash with a value that doesn't match segment 1.
633        let result = run_chain_restore("chainbreak", |backend, metas| {
634            let sidecar_key = crate::storage::wal::wal_segment_manifest_key(&metas[1].key);
635            let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
636                .unwrap()
637                .unwrap();
638            bad.prev_hash = Some("00".repeat(32));
639            crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
640            let _ = sidecar_key; // ensure key reference unused warning suppressed
641        });
642        let err = result.expect_err("chain break must fail closed");
643        let msg = err.to_string();
644        assert!(
645            msg.contains("chain"),
646            "error must mention chain; got: {msg}"
647        );
648    }
649
650    #[test]
651    fn restore_fails_closed_on_sha256_corruption() {
652        // Tamper segment 2's sidecar sha256 so the integrity check
653        // (Phase 2.4) fires before the chain check (Phase 11.3).
654        let result = run_chain_restore("shacorrupt", |backend, metas| {
655            let mut bad = crate::storage::wal::load_wal_segment_manifest(backend, &metas[1].key)
656                .unwrap()
657                .unwrap();
658            bad.sha256 = Some("ff".repeat(32));
659            crate::storage::wal::publish_wal_segment_manifest(backend, &bad).unwrap();
660        });
661        let err = result.expect_err("sha mismatch must fail closed");
662        let msg = err.to_string();
663        assert!(
664            msg.contains("integrity") || msg.contains("sha256"),
665            "error must mention integrity/sha256; got: {msg}"
666        );
667    }
668}