Skip to main content

reddb_server/storage/wal/
archiver.rs

1//! WAL Archiver — copies WAL segments to remote backend before truncation.
2//!
3//! Enables Point-in-Time Recovery (PITR) by preserving WAL history.
4//! Integrates with the checkpoint flow to archive segments before they are truncated.
5
6use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11use crate::replication::cdc::ChangeRecord;
12use crate::storage::backend::{BackendError, RemoteBackend};
13
14/// Metadata about an archived WAL segment.
15#[derive(Debug, Clone)]
16pub struct WalSegmentMeta {
17    /// Remote key (e.g., "wal/000000000008-000000050432.wal")
18    pub key: String,
19    /// Starting LSN of this segment
20    pub lsn_start: u64,
21    /// Ending LSN of this segment
22    pub lsn_end: u64,
23    /// When this segment was archived (unix ms)
24    pub created_at: u64,
25    /// Size in bytes
26    pub size_bytes: u64,
27    /// Hex-encoded SHA-256 of the uploaded payload bytes (PLAN.md
28    /// Phase 2.4). Restore recomputes the digest after download and
29    /// fails closed on mismatch — same fail-closed contract as
30    /// `SnapshotManifest::snapshot_sha256`. `None` for legacy
31    /// segments archived before this field was introduced; restore
32    /// tolerates absence with a warning.
33    pub sha256: Option<String>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct BackupHead {
38    pub timeline_id: String,
39    pub snapshot_key: String,
40    pub snapshot_id: u64,
41    pub snapshot_time: u64,
42    pub current_lsn: u64,
43    pub last_archived_lsn: u64,
44    pub wal_prefix: String,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct SnapshotManifest {
49    pub timeline_id: String,
50    pub snapshot_key: String,
51    pub snapshot_id: u64,
52    pub snapshot_time: u64,
53    pub base_lsn: u64,
54    pub schema_version: u32,
55    pub format_version: u32,
56    /// Hex-encoded SHA-256 of the snapshot bytes computed at upload
57    /// time. Restore reads this from the manifest, downloads the
58    /// snapshot, recomputes the hash, and refuses to proceed on a
59    /// mismatch. `None` for legacy manifests written before this
60    /// field was introduced — restore tolerates absence (with a
61    /// warning) but rejects a present-but-wrong value.
62    pub snapshot_sha256: Option<String>,
63}
64
65impl BackupHead {
66    pub fn to_json_value(&self) -> JsonValue {
67        let mut object = Map::new();
68        object.insert(
69            "timeline_id".to_string(),
70            JsonValue::String(self.timeline_id.clone()),
71        );
72        object.insert(
73            "snapshot_key".to_string(),
74            JsonValue::String(self.snapshot_key.clone()),
75        );
76        object.insert(
77            "snapshot_id".to_string(),
78            JsonValue::Number(self.snapshot_id as f64),
79        );
80        object.insert(
81            "snapshot_time".to_string(),
82            JsonValue::Number(self.snapshot_time as f64),
83        );
84        object.insert(
85            "current_lsn".to_string(),
86            JsonValue::Number(self.current_lsn as f64),
87        );
88        object.insert(
89            "last_archived_lsn".to_string(),
90            JsonValue::Number(self.last_archived_lsn as f64),
91        );
92        object.insert(
93            "wal_prefix".to_string(),
94            JsonValue::String(self.wal_prefix.clone()),
95        );
96        JsonValue::Object(object)
97    }
98
99    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
100        Ok(Self {
101            timeline_id: value
102                .get("timeline_id")
103                .and_then(JsonValue::as_str)
104                .unwrap_or("main")
105                .to_string(),
106            snapshot_key: value
107                .get("snapshot_key")
108                .and_then(JsonValue::as_str)
109                .ok_or_else(|| {
110                    BackendError::Internal("backup head missing snapshot_key".to_string())
111                })?
112                .to_string(),
113            snapshot_id: value
114                .get("snapshot_id")
115                .and_then(JsonValue::as_u64)
116                .ok_or_else(|| {
117                    BackendError::Internal("backup head missing snapshot_id".to_string())
118                })?,
119            snapshot_time: value
120                .get("snapshot_time")
121                .and_then(JsonValue::as_u64)
122                .ok_or_else(|| {
123                    BackendError::Internal("backup head missing snapshot_time".to_string())
124                })?,
125            current_lsn: value
126                .get("current_lsn")
127                .and_then(JsonValue::as_u64)
128                .unwrap_or(0),
129            last_archived_lsn: value
130                .get("last_archived_lsn")
131                .and_then(JsonValue::as_u64)
132                .unwrap_or(0),
133            wal_prefix: value
134                .get("wal_prefix")
135                .and_then(JsonValue::as_str)
136                .unwrap_or("wal/")
137                .to_string(),
138        })
139    }
140}
141
142impl SnapshotManifest {
143    pub fn to_json_value(&self) -> JsonValue {
144        let mut object = Map::new();
145        object.insert(
146            "timeline_id".to_string(),
147            JsonValue::String(self.timeline_id.clone()),
148        );
149        object.insert(
150            "snapshot_key".to_string(),
151            JsonValue::String(self.snapshot_key.clone()),
152        );
153        object.insert(
154            "snapshot_id".to_string(),
155            JsonValue::Number(self.snapshot_id as f64),
156        );
157        object.insert(
158            "snapshot_time".to_string(),
159            JsonValue::Number(self.snapshot_time as f64),
160        );
161        object.insert(
162            "base_lsn".to_string(),
163            JsonValue::Number(self.base_lsn as f64),
164        );
165        object.insert(
166            "schema_version".to_string(),
167            JsonValue::Number(self.schema_version as f64),
168        );
169        object.insert(
170            "format_version".to_string(),
171            JsonValue::Number(self.format_version as f64),
172        );
173        if let Some(ref sha) = self.snapshot_sha256 {
174            object.insert(
175                "snapshot_sha256".to_string(),
176                JsonValue::String(sha.clone()),
177            );
178        }
179        JsonValue::Object(object)
180    }
181
182    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
183        Ok(Self {
184            timeline_id: value
185                .get("timeline_id")
186                .and_then(JsonValue::as_str)
187                .unwrap_or("main")
188                .to_string(),
189            snapshot_key: value
190                .get("snapshot_key")
191                .and_then(JsonValue::as_str)
192                .ok_or_else(|| {
193                    BackendError::Internal("snapshot manifest missing snapshot_key".to_string())
194                })?
195                .to_string(),
196            snapshot_id: value
197                .get("snapshot_id")
198                .and_then(JsonValue::as_u64)
199                .ok_or_else(|| {
200                    BackendError::Internal("snapshot manifest missing snapshot_id".to_string())
201                })?,
202            snapshot_time: value
203                .get("snapshot_time")
204                .and_then(JsonValue::as_u64)
205                .ok_or_else(|| {
206                    BackendError::Internal("snapshot manifest missing snapshot_time".to_string())
207                })?,
208            base_lsn: value
209                .get("base_lsn")
210                .and_then(JsonValue::as_u64)
211                .unwrap_or(0),
212            schema_version: value
213                .get("schema_version")
214                .and_then(JsonValue::as_u64)
215                .unwrap_or(crate::api::REDDB_FORMAT_VERSION as u64)
216                as u32,
217            format_version: value
218                .get("format_version")
219                .and_then(JsonValue::as_u64)
220                .unwrap_or(crate::api::REDDB_FORMAT_VERSION as u64)
221                as u32,
222            snapshot_sha256: value
223                .get("snapshot_sha256")
224                .and_then(JsonValue::as_str)
225                .map(|s| s.to_string()),
226        })
227    }
228
229    /// Compute SHA-256 over the local snapshot file. Used at archive
230    /// time so the manifest can carry the digest for restore-side
231    /// verification. Streamed (8 KiB chunks) so very large snapshots
232    /// don't peak memory.
233    pub fn compute_snapshot_sha256(snapshot_path: &Path) -> Result<String, BackendError> {
234        sha256_file_hex(snapshot_path)
235    }
236}
237
238/// Stream-hash a local file to a hex SHA-256. Shared by snapshot and
239/// WAL segment archival. Streamed in 8 KiB chunks so multi-GiB files
240/// don't peak memory.
241pub fn sha256_file_hex(path: &Path) -> Result<String, BackendError> {
242    use std::fs::File;
243    use std::io::Read;
244    let mut hasher = crate::crypto::sha256::Sha256::new();
245    let mut file = File::open(path)
246        .map_err(|err| BackendError::Internal(format!("open file for hash {path:?}: {err}")))?;
247    let mut buf = vec![0u8; 8 * 1024];
248    loop {
249        let n = file
250            .read(&mut buf)
251            .map_err(|err| BackendError::Internal(format!("read file for hash {path:?}: {err}")))?;
252        if n == 0 {
253            break;
254        }
255        hasher.update(&buf[..n]);
256    }
257    Ok(crate::utils::to_hex(&hasher.finalize()))
258}
259
260/// Compute SHA-256 over a byte slice and return the hex digest.
261/// Convenience for in-memory payloads (logical WAL segment buffer
262/// before upload).
263pub fn sha256_bytes_hex(bytes: &[u8]) -> String {
264    crate::utils::to_hex(&crate::crypto::sha256::sha256(bytes))
265}
266
267/// WAL Archiver — copies WAL segments to a remote backend.
268pub struct WalArchiver {
269    backend: Arc<dyn RemoteBackend>,
270    prefix: String,
271}
272
273impl WalArchiver {
274    /// Create a new archiver with a remote backend and key prefix.
275    pub fn new(backend: Arc<dyn RemoteBackend>, prefix: impl Into<String>) -> Self {
276        Self {
277            backend,
278            prefix: prefix.into(),
279        }
280    }
281
282    /// Archive a WAL file as a named segment.
283    /// Call this BEFORE truncating the WAL.
284    /// `prev_hash` links this segment to the prior one in the
285    /// timeline (PLAN.md Phase 11.3) — pass the sha256 of the last
286    /// successfully archived segment, or `None` if this is the first
287    /// segment in a fresh timeline.
288    pub fn archive_segment(
289        &self,
290        wal_path: &Path,
291        lsn_start: u64,
292        lsn_end: u64,
293        prev_hash: Option<String>,
294    ) -> Result<WalSegmentMeta, BackendError> {
295        let size_bytes = std::fs::metadata(wal_path).map(|m| m.len()).unwrap_or(0);
296
297        // Hash *before* upload so a torn upload (partial PUT) is
298        // caught by the post-restore verification. The digest covers
299        // the on-disk bytes, not whatever the backend ends up holding.
300        let sha = sha256_file_hex(wal_path).ok();
301
302        let key = format!("{}{:012}-{:012}.wal", self.prefix, lsn_start, lsn_end);
303
304        self.backend.upload(wal_path, &key)?;
305
306        let created_at = SystemTime::now()
307            .duration_since(UNIX_EPOCH)
308            .unwrap_or_default()
309            .as_millis() as u64;
310
311        let meta = WalSegmentMeta {
312            key,
313            lsn_start,
314            lsn_end,
315            created_at,
316            size_bytes,
317            sha256: sha,
318        };
319        if let Err(err) = publish_wal_segment_manifest(
320            self.backend.as_ref(),
321            &WalSegmentManifest::from_meta(&meta, prev_hash),
322        ) {
323            tracing::warn!(
324                target: "reddb::backup",
325                error = %err,
326                segment_key = %meta.key,
327                "wal segment manifest publish failed; segment archived without checksum sidecar"
328            );
329        }
330        Ok(meta)
331    }
332
333    /// Download an archived WAL segment to a local path.
334    pub fn download_segment(&self, segment_key: &str, dest: &Path) -> Result<bool, BackendError> {
335        self.backend.download(segment_key, dest)
336    }
337
338    /// Delete archived segments older than the given LSN.
339    /// Returns the number of segments deleted.
340    pub fn cleanup_before(&self, lsn: u64) -> Result<usize, BackendError> {
341        let keys = self.backend.list(&self.prefix)?;
342        let mut deleted = 0usize;
343        for key in keys {
344            let path = PathBuf::from(&key);
345            let Some(file_name) = path.file_name().and_then(|s| s.to_str()) else {
346                continue;
347            };
348            let Some((start, _end)) = file_name
349                .strip_suffix(".wal")
350                .and_then(|base| base.split_once('-'))
351            else {
352                continue;
353            };
354            let Ok(lsn_start) = start.parse::<u64>() else {
355                continue;
356            };
357            if lsn_start < lsn {
358                self.backend.delete(&key)?;
359                deleted += 1;
360            }
361        }
362        Ok(deleted)
363    }
364
365    /// Check if a segment exists in the remote backend.
366    pub fn segment_exists(&self, segment_key: &str) -> Result<bool, BackendError> {
367        self.backend.exists(segment_key)
368    }
369
370    /// Get the backend name for logging.
371    pub fn backend_name(&self) -> &str {
372        self.backend.name()
373    }
374}
375
376/// Archive a snapshot file to a remote backend.
377pub fn archive_snapshot(
378    backend: &dyn RemoteBackend,
379    snapshot_path: &Path,
380    snapshot_id: u64,
381    prefix: &str,
382) -> Result<String, BackendError> {
383    let timestamp = SystemTime::now()
384        .duration_since(UNIX_EPOCH)
385        .unwrap_or_default()
386        .as_millis() as u64;
387
388    let key = format!("{}{:012}-{}.snapshot", prefix, snapshot_id, timestamp);
389
390    backend.upload(snapshot_path, &key)?;
391    Ok(key)
392}
393
394pub fn snapshot_manifest_key(snapshot_key: &str) -> String {
395    format!("{snapshot_key}.manifest.json")
396}
397
398/// Per-segment manifest written next to each archived WAL segment
399/// (PLAN.md Phase 2.4 + 11.3). Holds the digest the restore side
400/// needs to verify the segment bytes after download, plus a
401/// `prev_hash` linking it to the previous segment in the timeline so
402/// the restore can detect a missing/reordered/replaced middle
403/// segment. Stored at `<segment_key>.manifest.json` so
404/// `cleanup_before` drops the pair atomically.
405#[derive(Debug, Clone, PartialEq, Eq)]
406pub struct WalSegmentManifest {
407    pub key: String,
408    pub lsn_start: u64,
409    pub lsn_end: u64,
410    pub size_bytes: u64,
411    pub created_at: u64,
412    /// Hex SHA-256 of *this* segment's payload bytes.
413    pub sha256: Option<String>,
414    /// Hex SHA-256 of the segment immediately preceding this one in
415    /// the timeline. `None` only for the very first segment after a
416    /// fresh snapshot / PITR restore. Restore validates that
417    /// segment[i].prev_hash == segment[i-1].sha256; any break is
418    /// fail-closed (PLAN.md Phase 11.3).
419    pub prev_hash: Option<String>,
420}
421
422impl WalSegmentManifest {
423    pub fn from_meta(meta: &WalSegmentMeta, prev_hash: Option<String>) -> Self {
424        Self {
425            key: meta.key.clone(),
426            lsn_start: meta.lsn_start,
427            lsn_end: meta.lsn_end,
428            size_bytes: meta.size_bytes,
429            created_at: meta.created_at,
430            sha256: meta.sha256.clone(),
431            prev_hash,
432        }
433    }
434
435    pub fn to_json_value(&self) -> JsonValue {
436        let mut object = Map::new();
437        object.insert("key".to_string(), JsonValue::String(self.key.clone()));
438        object.insert(
439            "lsn_start".to_string(),
440            JsonValue::Number(self.lsn_start as f64),
441        );
442        object.insert(
443            "lsn_end".to_string(),
444            JsonValue::Number(self.lsn_end as f64),
445        );
446        object.insert(
447            "size_bytes".to_string(),
448            JsonValue::Number(self.size_bytes as f64),
449        );
450        object.insert(
451            "created_at".to_string(),
452            JsonValue::Number(self.created_at as f64),
453        );
454        if let Some(sha) = &self.sha256 {
455            object.insert("sha256".to_string(), JsonValue::String(sha.clone()));
456        }
457        if let Some(prev) = &self.prev_hash {
458            object.insert("prev_hash".to_string(), JsonValue::String(prev.clone()));
459        }
460        JsonValue::Object(object)
461    }
462
463    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
464        Ok(Self {
465            key: value
466                .get("key")
467                .and_then(JsonValue::as_str)
468                .ok_or_else(|| {
469                    BackendError::Internal("wal segment manifest missing key".to_string())
470                })?
471                .to_string(),
472            lsn_start: value
473                .get("lsn_start")
474                .and_then(JsonValue::as_u64)
475                .unwrap_or(0),
476            lsn_end: value
477                .get("lsn_end")
478                .and_then(JsonValue::as_u64)
479                .unwrap_or(0),
480            size_bytes: value
481                .get("size_bytes")
482                .and_then(JsonValue::as_u64)
483                .unwrap_or(0),
484            created_at: value
485                .get("created_at")
486                .and_then(JsonValue::as_u64)
487                .unwrap_or(0),
488            sha256: value
489                .get("sha256")
490                .and_then(JsonValue::as_str)
491                .map(|s| s.to_string()),
492            prev_hash: value
493                .get("prev_hash")
494                .and_then(JsonValue::as_str)
495                .map(|s| s.to_string()),
496        })
497    }
498}
499
500pub fn wal_segment_manifest_key(segment_key: &str) -> String {
501    format!("{segment_key}.manifest.json")
502}
503
504/// Top-level backup catalog (PLAN.md Phase 2.4). One JSON file at
505/// `<prefix>MANIFEST.json` lists every snapshot and WAL segment in a
506/// stable shape that external tooling can parse without sniffing
507/// directory listings.
508///
509/// Spec lives in `docs/spec/manifest-format.md`. Versioned via the
510/// `version` field — incompatible schema changes bump the major. The
511/// engine's own restore code reads the per-snapshot and per-segment
512/// sidecars *first*; the unified catalog is for human / orchestrator
513/// inspection, manual disaster recovery, and third-party verifiers.
514#[derive(Debug, Clone, PartialEq, Eq)]
515pub struct UnifiedManifest {
516    /// Schema version. Currently `1.0`.
517    pub version: String,
518    /// `CARGO_PKG_VERSION` of the engine that wrote the manifest.
519    pub engine_version: String,
520    /// Highest LSN known across all archived WAL segments. `0` when
521    /// no WAL has been archived yet.
522    pub latest_lsn: u64,
523    /// All snapshots known to this prefix, freshest first.
524    pub snapshots: Vec<UnifiedSnapshotEntry>,
525    /// All WAL segments known to this prefix, ordered by `lsn_start`.
526    pub wal_segments: Vec<UnifiedWalEntry>,
527}
528
529#[derive(Debug, Clone, PartialEq, Eq)]
530pub struct UnifiedSnapshotEntry {
531    pub id: u64,
532    pub lsn: u64,
533    pub ts: u64,
534    pub bytes: u64,
535    pub key: String,
536    pub checksum: Option<String>,
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
540pub struct UnifiedWalEntry {
541    pub lsn_start: u64,
542    pub lsn_end: u64,
543    pub key: String,
544    pub bytes: u64,
545    pub checksum: Option<String>,
546    /// PLAN.md Phase 11.3 — sha256 of the prior segment in the
547    /// timeline. Surfacing this in the unified manifest lets
548    /// external verifiers validate the chain end-to-end from the
549    /// catalog alone, without per-segment GETs.
550    pub prev_hash: Option<String>,
551}
552
553impl UnifiedManifest {
554    pub const VERSION: &'static str = "1.0";
555
556    pub fn new(snapshots: Vec<UnifiedSnapshotEntry>, wal_segments: Vec<UnifiedWalEntry>) -> Self {
557        let latest_lsn = wal_segments
558            .iter()
559            .map(|w| w.lsn_end)
560            .chain(snapshots.iter().map(|s| s.lsn))
561            .max()
562            .unwrap_or(0);
563        Self {
564            version: Self::VERSION.to_string(),
565            engine_version: env!("CARGO_PKG_VERSION").to_string(),
566            latest_lsn,
567            snapshots,
568            wal_segments,
569        }
570    }
571
572    pub fn to_json_value(&self) -> JsonValue {
573        let mut obj = Map::new();
574        obj.insert(
575            "version".to_string(),
576            JsonValue::String(self.version.clone()),
577        );
578        obj.insert(
579            "engine_version".to_string(),
580            JsonValue::String(self.engine_version.clone()),
581        );
582        obj.insert(
583            "latest_lsn".to_string(),
584            JsonValue::Number(self.latest_lsn as f64),
585        );
586        obj.insert(
587            "snapshots".to_string(),
588            JsonValue::Array(
589                self.snapshots
590                    .iter()
591                    .map(UnifiedSnapshotEntry::to_json_value)
592                    .collect(),
593            ),
594        );
595        obj.insert(
596            "wal_segments".to_string(),
597            JsonValue::Array(
598                self.wal_segments
599                    .iter()
600                    .map(UnifiedWalEntry::to_json_value)
601                    .collect(),
602            ),
603        );
604        JsonValue::Object(obj)
605    }
606
607    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
608        let obj = value.as_object().ok_or_else(|| {
609            BackendError::Internal("unified manifest must be a JSON object".to_string())
610        })?;
611        Ok(Self {
612            version: obj
613                .get("version")
614                .and_then(JsonValue::as_str)
615                .unwrap_or("1.0")
616                .to_string(),
617            engine_version: obj
618                .get("engine_version")
619                .and_then(JsonValue::as_str)
620                .unwrap_or("unknown")
621                .to_string(),
622            latest_lsn: obj
623                .get("latest_lsn")
624                .and_then(JsonValue::as_u64)
625                .unwrap_or(0),
626            snapshots: obj
627                .get("snapshots")
628                .and_then(JsonValue::as_array)
629                .map(|arr| {
630                    arr.iter()
631                        .filter_map(|v| UnifiedSnapshotEntry::from_json_value(v).ok())
632                        .collect()
633                })
634                .unwrap_or_default(),
635            wal_segments: obj
636                .get("wal_segments")
637                .and_then(JsonValue::as_array)
638                .map(|arr| {
639                    arr.iter()
640                        .filter_map(|v| UnifiedWalEntry::from_json_value(v).ok())
641                        .collect()
642                })
643                .unwrap_or_default(),
644        })
645    }
646}
647
648impl UnifiedSnapshotEntry {
649    pub fn to_json_value(&self) -> JsonValue {
650        let mut obj = Map::new();
651        obj.insert("id".to_string(), JsonValue::Number(self.id as f64));
652        obj.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
653        obj.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
654        obj.insert("bytes".to_string(), JsonValue::Number(self.bytes as f64));
655        obj.insert("key".to_string(), JsonValue::String(self.key.clone()));
656        if let Some(c) = &self.checksum {
657            obj.insert(
658                "checksum".to_string(),
659                JsonValue::String(format!("sha256:{c}")),
660            );
661        }
662        JsonValue::Object(obj)
663    }
664
665    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
666        let obj = value.as_object().ok_or_else(|| {
667            BackendError::Internal("snapshot entry must be a JSON object".to_string())
668        })?;
669        Ok(Self {
670            id: obj.get("id").and_then(JsonValue::as_u64).unwrap_or(0),
671            lsn: obj.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
672            ts: obj.get("ts").and_then(JsonValue::as_u64).unwrap_or(0),
673            bytes: obj.get("bytes").and_then(JsonValue::as_u64).unwrap_or(0),
674            key: obj
675                .get("key")
676                .and_then(JsonValue::as_str)
677                .ok_or_else(|| BackendError::Internal("snapshot entry missing key".to_string()))?
678                .to_string(),
679            checksum: obj
680                .get("checksum")
681                .and_then(JsonValue::as_str)
682                .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
683        })
684    }
685}
686
687impl UnifiedWalEntry {
688    pub fn to_json_value(&self) -> JsonValue {
689        let mut obj = Map::new();
690        obj.insert(
691            "lsn_start".to_string(),
692            JsonValue::Number(self.lsn_start as f64),
693        );
694        obj.insert(
695            "lsn_end".to_string(),
696            JsonValue::Number(self.lsn_end as f64),
697        );
698        obj.insert("key".to_string(), JsonValue::String(self.key.clone()));
699        obj.insert("bytes".to_string(), JsonValue::Number(self.bytes as f64));
700        if let Some(c) = &self.checksum {
701            obj.insert(
702                "checksum".to_string(),
703                JsonValue::String(format!("sha256:{c}")),
704            );
705        }
706        if let Some(p) = &self.prev_hash {
707            obj.insert(
708                "prev_hash".to_string(),
709                JsonValue::String(format!("sha256:{p}")),
710            );
711        }
712        JsonValue::Object(obj)
713    }
714
715    pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
716        let obj = value.as_object().ok_or_else(|| {
717            BackendError::Internal("wal segment entry must be a JSON object".to_string())
718        })?;
719        Ok(Self {
720            lsn_start: obj
721                .get("lsn_start")
722                .and_then(JsonValue::as_u64)
723                .unwrap_or(0),
724            lsn_end: obj.get("lsn_end").and_then(JsonValue::as_u64).unwrap_or(0),
725            key: obj
726                .get("key")
727                .and_then(JsonValue::as_str)
728                .ok_or_else(|| BackendError::Internal("wal segment entry missing key".to_string()))?
729                .to_string(),
730            bytes: obj.get("bytes").and_then(JsonValue::as_u64).unwrap_or(0),
731            checksum: obj
732                .get("checksum")
733                .and_then(JsonValue::as_str)
734                .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
735            prev_hash: obj
736                .get("prev_hash")
737                .and_then(JsonValue::as_str)
738                .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
739        })
740    }
741}
742
743pub fn unified_manifest_key(prefix: &str) -> String {
744    let trimmed = prefix.trim_end_matches('/');
745    if trimmed.is_empty() {
746        "MANIFEST.json".to_string()
747    } else {
748        format!("{trimmed}/MANIFEST.json")
749    }
750}
751
752/// Atomic publish via temp+rename semantics. Per backend behaviour:
753///   * Filesystem backend renames the temp key, so concurrent readers
754///     see either the old manifest or the new one — never a torn one.
755///   * S3-compatible backends without conditional PUT can't fully
756///     guarantee that, but the fresh-temp-then-replace pattern is the
757///     best the trait surface offers today. PLAN.md Phase 2.4 calls
758///     out PUT-if-match as a follow-up once `RemoteBackend` grows
759///     conditional methods.
760pub fn publish_unified_manifest(
761    backend: &dyn RemoteBackend,
762    prefix: &str,
763    manifest: &UnifiedManifest,
764) -> Result<String, BackendError> {
765    let key = unified_manifest_key(prefix);
766    write_json_object(backend, &key, &manifest.to_json_value())?;
767    Ok(key)
768}
769
770pub fn load_unified_manifest(
771    backend: &dyn RemoteBackend,
772    prefix: &str,
773) -> Result<Option<UnifiedManifest>, BackendError> {
774    let key = unified_manifest_key(prefix);
775    let Some(value) = read_json_object(backend, &key)? else {
776        return Ok(None);
777    };
778    Ok(Some(UnifiedManifest::from_json_value(&value)?))
779}
780
781/// Build the unified manifest by listing the configured backup root
782/// and reading per-artifact sidecars in parallel-safe sequence. The
783/// resulting `MANIFEST.json` is published atomically (temp + rename
784/// on FS, fresh-temp-then-replace on S3-compatible).
785///
786/// `snapshot_prefix` here is the *backup root* prefix (the parent of
787/// `snapshots/` and `wal/`); the unified manifest is always written
788/// at `<root>/MANIFEST.json`. When the runtime hands us a more
789/// specific prefix (e.g. `snapshots/clusters/dev/`), we walk back to
790/// the parent before publishing.
791pub fn publish_unified_manifest_for_prefix(
792    backend: &dyn RemoteBackend,
793    snapshot_prefix: &str,
794) -> Result<String, BackendError> {
795    let root = derive_backup_root(snapshot_prefix);
796    let snapshots = collect_unified_snapshots(backend, snapshot_prefix)?;
797    let wal_root = format!("{}wal/", root);
798    let wal_segments = collect_unified_wal_segments(backend, &wal_root)?;
799    let manifest = UnifiedManifest::new(snapshots, wal_segments);
800    publish_unified_manifest(backend, &root, &manifest)
801}
802
803fn derive_backup_root(snapshot_prefix: &str) -> String {
804    // `snapshots/...` → `""`; `<root>/snapshots/...` → `<root>/`. Empty
805    // prefix is allowed when the operator publishes everything under
806    // the bucket root.
807    let trimmed = snapshot_prefix.trim_end_matches('/');
808    if let Some(idx) = trimmed.rfind("/snapshots") {
809        let (head, _) = trimmed.split_at(idx);
810        if head.is_empty() {
811            String::new()
812        } else {
813            format!("{head}/")
814        }
815    } else if trimmed == "snapshots" || trimmed.is_empty() {
816        String::new()
817    } else {
818        // Already a root prefix (no /snapshots suffix).
819        format!("{trimmed}/")
820    }
821}
822
823fn collect_unified_snapshots(
824    backend: &dyn RemoteBackend,
825    snapshot_prefix: &str,
826) -> Result<Vec<UnifiedSnapshotEntry>, BackendError> {
827    let keys = backend.list(snapshot_prefix)?;
828    let mut out = Vec::new();
829    for key in keys {
830        // Skip sidecars themselves — we only want the snapshot
831        // payload keys, then we read each one's sidecar for metadata.
832        if key.ends_with(".manifest.json") {
833            continue;
834        }
835        let Some(manifest) = load_snapshot_manifest(backend, &key)? else {
836            continue;
837        };
838        out.push(UnifiedSnapshotEntry {
839            id: manifest.snapshot_id,
840            lsn: manifest.base_lsn,
841            ts: manifest.snapshot_time,
842            bytes: 0,
843            key: manifest.snapshot_key.clone(),
844            checksum: manifest.snapshot_sha256.clone(),
845        });
846    }
847    out.sort_by_key(|s| std::cmp::Reverse(s.ts));
848    Ok(out)
849}
850
851fn collect_unified_wal_segments(
852    backend: &dyn RemoteBackend,
853    wal_prefix: &str,
854) -> Result<Vec<UnifiedWalEntry>, BackendError> {
855    let keys = backend.list(wal_prefix)?;
856    let mut out = Vec::new();
857    for key in keys {
858        if key.ends_with(".manifest.json") {
859            continue;
860        }
861        if !key.ends_with(".wal") {
862            continue;
863        }
864        let Some(manifest) = load_wal_segment_manifest(backend, &key)? else {
865            continue;
866        };
867        out.push(UnifiedWalEntry {
868            lsn_start: manifest.lsn_start,
869            lsn_end: manifest.lsn_end,
870            key: manifest.key.clone(),
871            bytes: manifest.size_bytes,
872            checksum: manifest.sha256.clone(),
873            prev_hash: manifest.prev_hash.clone(),
874        });
875    }
876    out.sort_by_key(|w| w.lsn_start);
877    Ok(out)
878}
879
880pub fn publish_wal_segment_manifest(
881    backend: &dyn RemoteBackend,
882    manifest: &WalSegmentManifest,
883) -> Result<String, BackendError> {
884    let key = wal_segment_manifest_key(&manifest.key);
885    write_json_object(backend, &key, &manifest.to_json_value())?;
886    Ok(key)
887}
888
889pub fn load_wal_segment_manifest(
890    backend: &dyn RemoteBackend,
891    segment_key: &str,
892) -> Result<Option<WalSegmentManifest>, BackendError> {
893    let key = wal_segment_manifest_key(segment_key);
894    let Some(value) = read_json_object(backend, &key)? else {
895        return Ok(None);
896    };
897    Ok(Some(WalSegmentManifest::from_json_value(&value)?))
898}
899
900pub fn publish_backup_head(
901    backend: &dyn RemoteBackend,
902    head_key: &str,
903    head: &BackupHead,
904) -> Result<(), BackendError> {
905    write_json_object(backend, head_key, &head.to_json_value())
906}
907
908pub fn load_backup_head(
909    backend: &dyn RemoteBackend,
910    head_key: &str,
911) -> Result<Option<BackupHead>, BackendError> {
912    let Some(value) = read_json_object(backend, head_key)? else {
913        return Ok(None);
914    };
915    Ok(Some(BackupHead::from_json_value(&value)?))
916}
917
918pub fn publish_snapshot_manifest(
919    backend: &dyn RemoteBackend,
920    manifest: &SnapshotManifest,
921) -> Result<String, BackendError> {
922    let key = snapshot_manifest_key(&manifest.snapshot_key);
923    write_json_object(backend, &key, &manifest.to_json_value())?;
924    Ok(key)
925}
926
927pub fn load_snapshot_manifest(
928    backend: &dyn RemoteBackend,
929    snapshot_key: &str,
930) -> Result<Option<SnapshotManifest>, BackendError> {
931    let key = snapshot_manifest_key(snapshot_key);
932    let Some(value) = read_json_object(backend, &key)? else {
933        return Ok(None);
934    };
935    Ok(Some(SnapshotManifest::from_json_value(&value)?))
936}
937
938pub fn archive_change_records(
939    backend: &dyn RemoteBackend,
940    prefix: &str,
941    records: &[(u64, Vec<u8>)],
942    prev_hash: Option<String>,
943) -> Result<Option<WalSegmentMeta>, BackendError> {
944    let Some((lsn_start, _)) = records.first() else {
945        return Ok(None);
946    };
947    let Some((lsn_end, _)) = records.last() else {
948        return Ok(None);
949    };
950
951    let payload = JsonValue::Array(
952        records
953            .iter()
954            .map(|(lsn, bytes)| {
955                let mut object = Map::new();
956                object.insert("lsn".to_string(), JsonValue::Number(*lsn as f64));
957                object.insert("data".to_string(), JsonValue::String(hex::encode(bytes)));
958                JsonValue::Object(object)
959            })
960            .collect(),
961    );
962    let body = crate::json::to_vec(&payload).map_err(|err| {
963        BackendError::Internal(format!("encode archived logical wal failed: {err}"))
964    })?;
965    // Hash the encoded payload before persisting so the digest
966    // matches what gets uploaded byte-for-byte.
967    let sha = sha256_bytes_hex(&body);
968
969    let temp = temp_json_path(
970        "reddb-archived-change-records",
971        Some(*lsn_start),
972        Some(*lsn_end),
973    );
974    std::fs::write(&temp, &body)
975        .map_err(|err| BackendError::Transport(format!("write temp logical wal failed: {err}")))?;
976
977    let key = format!("{}{:012}-{:012}.wal", prefix, lsn_start, lsn_end);
978    backend.upload(&temp, &key)?;
979    let size_bytes = std::fs::metadata(&temp).map(|meta| meta.len()).unwrap_or(0);
980    let _ = std::fs::remove_file(&temp);
981
982    let meta = WalSegmentMeta {
983        key,
984        lsn_start: *lsn_start,
985        lsn_end: *lsn_end,
986        created_at: SystemTime::now()
987            .duration_since(UNIX_EPOCH)
988            .unwrap_or_default()
989            .as_millis() as u64,
990        size_bytes,
991        sha256: Some(sha),
992    };
993
994    // Per-segment sidecar manifest. Best-effort — a failure to publish
995    // the manifest leaves the segment without a checksum, which restore
996    // tolerates with a warning, so this is non-fatal. We still log so
997    // operators can flag that backup integrity coverage is degraded.
998    if let Err(err) =
999        publish_wal_segment_manifest(backend, &WalSegmentManifest::from_meta(&meta, prev_hash))
1000    {
1001        tracing::warn!(
1002            target: "reddb::backup",
1003            error = %err,
1004            segment_key = %meta.key,
1005            "wal segment manifest publish failed; segment archived without checksum sidecar"
1006        );
1007    }
1008
1009    Ok(Some(meta))
1010}
1011
1012pub fn load_archived_change_records(
1013    backend: &dyn RemoteBackend,
1014    segment_key: &str,
1015) -> Result<Vec<ChangeRecord>, BackendError> {
1016    let (records, _digest) = load_archived_change_records_with_sha256(backend, segment_key)?;
1017    Ok(records)
1018}
1019
1020/// Same as `load_archived_change_records` but also returns the
1021/// SHA-256 of the downloaded payload bytes so the caller can verify
1022/// it against the segment's manifest digest. Restore flows pair this
1023/// with `load_wal_segment_manifest` to fail closed on tampering.
1024pub fn load_archived_change_records_with_sha256(
1025    backend: &dyn RemoteBackend,
1026    segment_key: &str,
1027) -> Result<(Vec<ChangeRecord>, Option<String>), BackendError> {
1028    let temp = temp_json_path("reddb-archived-change-records-read", None, None);
1029    let found = backend.download(segment_key, &temp)?;
1030    if !found {
1031        let _ = std::fs::remove_file(&temp);
1032        return Ok((Vec::new(), None));
1033    }
1034    let bytes = std::fs::read(&temp)
1035        .map_err(|err| BackendError::Transport(format!("read temp logical wal failed: {err}")))?;
1036    let _ = std::fs::remove_file(&temp);
1037    let digest = sha256_bytes_hex(&bytes);
1038
1039    let value: JsonValue = crate::json::from_slice(&bytes).map_err(|err| {
1040        BackendError::Internal(format!("decode archived logical wal failed: {err}"))
1041    })?;
1042    let Some(entries) = value.as_array() else {
1043        return Err(BackendError::Internal(
1044            "archived logical wal must be a JSON array".to_string(),
1045        ));
1046    };
1047    let mut out = Vec::new();
1048    for entry in entries {
1049        let Some(data_hex) = entry.get("data").and_then(JsonValue::as_str) else {
1050            continue;
1051        };
1052        let data = hex::decode(data_hex).map_err(|err| {
1053            BackendError::Internal(format!("decode wal record hex failed: {err}"))
1054        })?;
1055        let record = ChangeRecord::decode(&data)
1056            .map_err(|err| BackendError::Internal(format!("decode wal record failed: {err}")))?;
1057        out.push(record);
1058    }
1059    Ok((out, Some(digest)))
1060}
1061
1062fn write_json_object(
1063    backend: &dyn RemoteBackend,
1064    key: &str,
1065    value: &JsonValue,
1066) -> Result<(), BackendError> {
1067    let temp = temp_json_path("reddb-json-object", None, None);
1068    std::fs::write(
1069        &temp,
1070        crate::json::to_vec(value)
1071            .map_err(|err| BackendError::Internal(format!("encode json object failed: {err}")))?,
1072    )
1073    .map_err(|err| BackendError::Transport(format!("write temp json object failed: {err}")))?;
1074    let upload_result = backend.upload(&temp, key);
1075    let _ = std::fs::remove_file(&temp);
1076    upload_result
1077}
1078
1079fn read_json_object(
1080    backend: &dyn RemoteBackend,
1081    key: &str,
1082) -> Result<Option<JsonValue>, BackendError> {
1083    let temp = temp_json_path("reddb-json-object-read", None, None);
1084    let found = backend.download(key, &temp)?;
1085    if !found {
1086        return Ok(None);
1087    }
1088    let bytes = std::fs::read(&temp)
1089        .map_err(|err| BackendError::Transport(format!("read temp json object failed: {err}")))?;
1090    let _ = std::fs::remove_file(&temp);
1091    let value = crate::json::from_slice::<JsonValue>(&bytes)
1092        .map_err(|err| BackendError::Internal(format!("decode json object failed: {err}")))?;
1093    Ok(Some(value))
1094}
1095
1096fn temp_json_path(prefix: &str, start: Option<u64>, end: Option<u64>) -> PathBuf {
1097    let suffix = match (start, end) {
1098        (Some(start), Some(end)) => format!("-{start}-{end}"),
1099        _ => String::new(),
1100    };
1101    std::env::temp_dir().join(format!(
1102        "{prefix}-{}{}-{}.json",
1103        std::process::id(),
1104        suffix,
1105        SystemTime::now()
1106            .duration_since(UNIX_EPOCH)
1107            .unwrap_or_default()
1108            .as_nanos()
1109    ))
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use super::*;
1115    use crate::storage::backend::local::LocalBackend;
1116    use std::io::Write;
1117
1118    #[test]
1119    fn test_archive_and_download() {
1120        let temp_dir = std::env::temp_dir().join("reddb_archiver_test");
1121        let _ = std::fs::create_dir_all(&temp_dir);
1122        let backend_dir = temp_dir.join("backend");
1123        let _ = std::fs::create_dir_all(&backend_dir);
1124
1125        let backend = Arc::new(LocalBackend);
1126        let archiver = WalArchiver::new(backend, "wal/");
1127
1128        // Create a fake WAL file
1129        let wal_path = temp_dir.join("test.wal");
1130        {
1131            let mut f = std::fs::File::create(&wal_path).unwrap();
1132            f.write_all(b"fake wal data").unwrap();
1133        }
1134
1135        // Archive it
1136        let meta = archiver.archive_segment(&wal_path, 8, 500, None).unwrap();
1137        assert_eq!(meta.lsn_start, 8);
1138        assert_eq!(meta.lsn_end, 500);
1139        assert!(meta.key.starts_with("wal/"));
1140        assert!(meta.key.ends_with(".wal"));
1141
1142        // Download it
1143        let dest = temp_dir.join("downloaded.wal");
1144        let found = archiver.download_segment(&meta.key, &dest).unwrap();
1145        assert!(found);
1146        assert!(dest.exists());
1147
1148        // Cleanup
1149        let _ = std::fs::remove_dir_all(&temp_dir);
1150    }
1151
1152    #[test]
1153    fn test_backup_head_roundtrip() {
1154        let temp_dir = std::env::temp_dir().join("reddb_backup_head_test");
1155        let _ = std::fs::create_dir_all(&temp_dir);
1156        let backend = LocalBackend;
1157        let head_key = temp_dir.join("manifests").join("head.json");
1158
1159        let head = BackupHead {
1160            timeline_id: "main".to_string(),
1161            snapshot_key: "snapshots/000001-123.snapshot".to_string(),
1162            snapshot_id: 1,
1163            snapshot_time: 123,
1164            current_lsn: 456,
1165            last_archived_lsn: 456,
1166            wal_prefix: "wal/".to_string(),
1167        };
1168
1169        publish_backup_head(&backend, &head_key.to_string_lossy(), &head).unwrap();
1170        let loaded = load_backup_head(&backend, &head_key.to_string_lossy())
1171            .unwrap()
1172            .expect("head");
1173        assert_eq!(loaded, head);
1174
1175        let _ = std::fs::remove_dir_all(&temp_dir);
1176    }
1177
1178    #[test]
1179    fn test_snapshot_manifest_roundtrip() {
1180        let temp_dir = std::env::temp_dir().join("reddb_snapshot_manifest_test");
1181        let _ = std::fs::create_dir_all(&temp_dir);
1182        let backend = LocalBackend;
1183        let manifest = SnapshotManifest {
1184            timeline_id: "main".to_string(),
1185            snapshot_key: temp_dir
1186                .join("snapshots")
1187                .join("000001-123.snapshot")
1188                .to_string_lossy()
1189                .to_string(),
1190            snapshot_id: 1,
1191            snapshot_time: 123,
1192            base_lsn: 456,
1193            schema_version: crate::api::REDDB_FORMAT_VERSION,
1194            format_version: crate::api::REDDB_FORMAT_VERSION,
1195            snapshot_sha256: None,
1196        };
1197
1198        publish_snapshot_manifest(&backend, &manifest).unwrap();
1199        let loaded = load_snapshot_manifest(&backend, &manifest.snapshot_key)
1200            .unwrap()
1201            .expect("manifest");
1202        assert_eq!(loaded, manifest);
1203
1204        let _ = std::fs::remove_dir_all(&temp_dir);
1205    }
1206
1207    #[test]
1208    fn test_archived_change_records_roundtrip() {
1209        let temp_dir = std::env::temp_dir().join("reddb_archived_change_records_test");
1210        let _ = std::fs::remove_dir_all(&temp_dir);
1211        std::fs::create_dir_all(&temp_dir).unwrap();
1212        let backend = LocalBackend;
1213        let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1214        let record = ChangeRecord {
1215            lsn: 7,
1216            timestamp: 1234,
1217            operation: crate::replication::cdc::ChangeOperation::Insert,
1218            collection: "users".to_string(),
1219            entity_id: 42,
1220            entity_kind: "row".to_string(),
1221            entity_bytes: Some(vec![1, 2, 3]),
1222            metadata: None,
1223        };
1224
1225        let meta =
1226            archive_change_records(&backend, &prefix, &[(record.lsn, record.encode())], None)
1227                .unwrap()
1228                .expect("meta");
1229        let loaded = load_archived_change_records(&backend, &meta.key).unwrap();
1230        assert_eq!(loaded.len(), 1);
1231        assert_eq!(loaded[0].lsn, 7);
1232
1233        let _ = std::fs::remove_dir_all(&temp_dir);
1234    }
1235
1236    #[test]
1237    fn archive_change_records_writes_sidecar_with_sha256() {
1238        let temp_dir =
1239            std::env::temp_dir().join(format!("reddb_archiver_sidecar_{}", std::process::id()));
1240        let _ = std::fs::remove_dir_all(&temp_dir);
1241        std::fs::create_dir_all(&temp_dir).unwrap();
1242        let backend = LocalBackend;
1243        let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1244        let record = ChangeRecord {
1245            lsn: 11,
1246            timestamp: 99,
1247            operation: crate::replication::cdc::ChangeOperation::Insert,
1248            collection: "users".to_string(),
1249            entity_id: 1,
1250            entity_kind: "row".to_string(),
1251            entity_bytes: Some(b"x".to_vec()),
1252            metadata: None,
1253        };
1254        let meta =
1255            archive_change_records(&backend, &prefix, &[(record.lsn, record.encode())], None)
1256                .unwrap()
1257                .expect("meta");
1258        assert!(meta.sha256.is_some(), "WalSegmentMeta should carry sha256");
1259
1260        let sidecar = load_wal_segment_manifest(&backend, &meta.key)
1261            .unwrap()
1262            .expect("sidecar");
1263        assert_eq!(sidecar.key, meta.key);
1264        assert_eq!(sidecar.lsn_start, meta.lsn_start);
1265        assert_eq!(sidecar.lsn_end, meta.lsn_end);
1266        assert_eq!(sidecar.sha256, meta.sha256);
1267
1268        let (_records, computed) =
1269            load_archived_change_records_with_sha256(&backend, &meta.key).unwrap();
1270        assert_eq!(computed, meta.sha256, "computed sha must match sidecar");
1271
1272        let _ = std::fs::remove_dir_all(&temp_dir);
1273    }
1274
1275    #[test]
1276    fn unified_manifest_json_roundtrip() {
1277        let manifest = UnifiedManifest::new(
1278            vec![UnifiedSnapshotEntry {
1279                id: 7,
1280                lsn: 100,
1281                ts: 1730000000000,
1282                bytes: 4096,
1283                key: "snapshots/000007-1730000000000.snapshot".to_string(),
1284                checksum: Some("9f8b".to_string()),
1285            }],
1286            vec![UnifiedWalEntry {
1287                lsn_start: 100,
1288                lsn_end: 250,
1289                key: "wal/000000000100-000000000250.wal".to_string(),
1290                bytes: 1024,
1291                checksum: Some("c1d2".to_string()),
1292                prev_hash: Some("9f8b".to_string()),
1293            }],
1294        );
1295
1296        let json = manifest.to_json_value();
1297        let parsed = UnifiedManifest::from_json_value(&json).unwrap();
1298        assert_eq!(parsed, manifest);
1299        assert_eq!(parsed.latest_lsn, 250);
1300
1301        // prev_hash must round-trip with `sha256:` prefix on the wire
1302        // (PLAN.md Phase 11.3) so external verifiers can validate
1303        // the chain end-to-end without parsing the per-segment sidecar.
1304        assert_eq!(parsed.wal_segments[0].prev_hash.as_deref(), Some("9f8b"));
1305        let wal_wire = parsed.wal_segments[0].to_json_value().to_string_compact();
1306        assert!(
1307            wal_wire.contains("\"prev_hash\":\"sha256:9f8b\""),
1308            "wire form must include sha256: prefix on prev_hash; got: {wal_wire}"
1309        );
1310
1311        // Checksum should round-trip with the `sha256:` prefix in the
1312        // wire form but parse back to the bare hex.
1313        let body = json.to_string_compact();
1314        assert!(
1315            body.contains("\"sha256:9f8b\""),
1316            "wire form must include sha256: prefix"
1317        );
1318        assert_eq!(parsed.snapshots[0].checksum.as_deref(), Some("9f8b"));
1319    }
1320
1321    #[test]
1322    fn unified_manifest_publish_load_roundtrip() {
1323        let temp_dir =
1324            std::env::temp_dir().join(format!("reddb_unified_manifest_{}", std::process::id()));
1325        let _ = std::fs::remove_dir_all(&temp_dir);
1326        std::fs::create_dir_all(&temp_dir).unwrap();
1327        let prefix = temp_dir.to_string_lossy().to_string();
1328
1329        let backend = LocalBackend;
1330        let manifest = UnifiedManifest::new(vec![], vec![]);
1331        publish_unified_manifest(&backend, &prefix, &manifest).unwrap();
1332        let loaded = load_unified_manifest(&backend, &prefix)
1333            .unwrap()
1334            .expect("manifest");
1335        assert_eq!(loaded.version, "1.0");
1336        assert_eq!(loaded.latest_lsn, 0);
1337
1338        let _ = std::fs::remove_dir_all(&temp_dir);
1339    }
1340
1341    #[test]
1342    fn archive_change_records_chains_prev_hash() {
1343        let temp_dir =
1344            std::env::temp_dir().join(format!("reddb_archive_chain_{}", std::process::id()));
1345        let _ = std::fs::remove_dir_all(&temp_dir);
1346        std::fs::create_dir_all(&temp_dir).unwrap();
1347        let backend = LocalBackend;
1348        let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1349
1350        let mk = |lsn: u64| ChangeRecord {
1351            lsn,
1352            timestamp: lsn * 1000,
1353            operation: crate::replication::cdc::ChangeOperation::Insert,
1354            collection: "users".to_string(),
1355            entity_id: lsn,
1356            entity_kind: "row".to_string(),
1357            entity_bytes: Some(format!("payload-{lsn}").into_bytes()),
1358            metadata: None,
1359        };
1360
1361        let r1 = mk(10);
1362        let m1 = archive_change_records(&backend, &prefix, &[(r1.lsn, r1.encode())], None)
1363            .unwrap()
1364            .expect("seg 1");
1365        let r2 = mk(11);
1366        let m2 = archive_change_records(
1367            &backend,
1368            &prefix,
1369            &[(r2.lsn, r2.encode())],
1370            m1.sha256.clone(),
1371        )
1372        .unwrap()
1373        .expect("seg 2");
1374        let r3 = mk(12);
1375        let m3 = archive_change_records(
1376            &backend,
1377            &prefix,
1378            &[(r3.lsn, r3.encode())],
1379            m2.sha256.clone(),
1380        )
1381        .unwrap()
1382        .expect("seg 3");
1383
1384        let s1 = load_wal_segment_manifest(&backend, &m1.key)
1385            .unwrap()
1386            .unwrap();
1387        let s2 = load_wal_segment_manifest(&backend, &m2.key)
1388            .unwrap()
1389            .unwrap();
1390        let s3 = load_wal_segment_manifest(&backend, &m3.key)
1391            .unwrap()
1392            .unwrap();
1393        assert!(s1.prev_hash.is_none(), "first segment has no prev_hash");
1394        assert_eq!(s2.prev_hash, m1.sha256, "seg 2 links to seg 1 sha256");
1395        assert_eq!(s3.prev_hash, m2.sha256, "seg 3 links to seg 2 sha256");
1396
1397        let _ = std::fs::remove_dir_all(&temp_dir);
1398    }
1399
1400    #[test]
1401    fn wal_segment_manifest_carries_prev_hash_through_json() {
1402        let m = WalSegmentManifest {
1403            key: "wal/000000000010-000000000010.wal".to_string(),
1404            lsn_start: 10,
1405            lsn_end: 10,
1406            size_bytes: 128,
1407            created_at: 1730000000000,
1408            sha256: Some("abc".to_string()),
1409            prev_hash: Some("def".to_string()),
1410        };
1411        let parsed = WalSegmentManifest::from_json_value(&m.to_json_value()).unwrap();
1412        assert_eq!(parsed, m);
1413    }
1414
1415    #[test]
1416    fn derive_backup_root_handles_typical_prefixes() {
1417        assert_eq!(derive_backup_root(""), "");
1418        assert_eq!(derive_backup_root("snapshots/"), "");
1419        assert_eq!(derive_backup_root("snapshots"), "");
1420        assert_eq!(
1421            derive_backup_root("clusters/dev/snapshots/"),
1422            "clusters/dev/"
1423        );
1424        assert_eq!(
1425            derive_backup_root("clusters/dev/snapshots"),
1426            "clusters/dev/"
1427        );
1428        assert_eq!(derive_backup_root("clusters/dev/"), "clusters/dev/");
1429    }
1430}