1use std::path::{Path, PathBuf};
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10use crate::json::{Map, Value as JsonValue};
11use crate::replication::cdc::ChangeRecord;
12use crate::storage::backend::{BackendError, RemoteBackend};
13
14#[derive(Debug, Clone)]
16pub struct WalSegmentMeta {
17 pub key: String,
19 pub lsn_start: u64,
21 pub lsn_end: u64,
23 pub created_at: u64,
25 pub size_bytes: u64,
27 pub sha256: Option<String>,
34}
35
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub struct BackupHead {
38 pub timeline_id: String,
39 pub snapshot_key: String,
40 pub snapshot_id: u64,
41 pub snapshot_time: u64,
42 pub current_lsn: u64,
43 pub last_archived_lsn: u64,
44 pub wal_prefix: String,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct SnapshotManifest {
49 pub timeline_id: String,
50 pub snapshot_key: String,
51 pub snapshot_id: u64,
52 pub snapshot_time: u64,
53 pub base_lsn: u64,
54 pub schema_version: u32,
55 pub format_version: u32,
56 pub snapshot_sha256: Option<String>,
63}
64
65impl BackupHead {
66 pub fn to_json_value(&self) -> JsonValue {
67 let mut object = Map::new();
68 object.insert(
69 "timeline_id".to_string(),
70 JsonValue::String(self.timeline_id.clone()),
71 );
72 object.insert(
73 "snapshot_key".to_string(),
74 JsonValue::String(self.snapshot_key.clone()),
75 );
76 object.insert(
77 "snapshot_id".to_string(),
78 JsonValue::Number(self.snapshot_id as f64),
79 );
80 object.insert(
81 "snapshot_time".to_string(),
82 JsonValue::Number(self.snapshot_time as f64),
83 );
84 object.insert(
85 "current_lsn".to_string(),
86 JsonValue::Number(self.current_lsn as f64),
87 );
88 object.insert(
89 "last_archived_lsn".to_string(),
90 JsonValue::Number(self.last_archived_lsn as f64),
91 );
92 object.insert(
93 "wal_prefix".to_string(),
94 JsonValue::String(self.wal_prefix.clone()),
95 );
96 JsonValue::Object(object)
97 }
98
99 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
100 Ok(Self {
101 timeline_id: value
102 .get("timeline_id")
103 .and_then(JsonValue::as_str)
104 .unwrap_or("main")
105 .to_string(),
106 snapshot_key: value
107 .get("snapshot_key")
108 .and_then(JsonValue::as_str)
109 .ok_or_else(|| {
110 BackendError::Internal("backup head missing snapshot_key".to_string())
111 })?
112 .to_string(),
113 snapshot_id: value
114 .get("snapshot_id")
115 .and_then(JsonValue::as_u64)
116 .ok_or_else(|| {
117 BackendError::Internal("backup head missing snapshot_id".to_string())
118 })?,
119 snapshot_time: value
120 .get("snapshot_time")
121 .and_then(JsonValue::as_u64)
122 .ok_or_else(|| {
123 BackendError::Internal("backup head missing snapshot_time".to_string())
124 })?,
125 current_lsn: value
126 .get("current_lsn")
127 .and_then(JsonValue::as_u64)
128 .unwrap_or(0),
129 last_archived_lsn: value
130 .get("last_archived_lsn")
131 .and_then(JsonValue::as_u64)
132 .unwrap_or(0),
133 wal_prefix: value
134 .get("wal_prefix")
135 .and_then(JsonValue::as_str)
136 .unwrap_or("wal/")
137 .to_string(),
138 })
139 }
140}
141
142impl SnapshotManifest {
143 pub fn to_json_value(&self) -> JsonValue {
144 let mut object = Map::new();
145 object.insert(
146 "timeline_id".to_string(),
147 JsonValue::String(self.timeline_id.clone()),
148 );
149 object.insert(
150 "snapshot_key".to_string(),
151 JsonValue::String(self.snapshot_key.clone()),
152 );
153 object.insert(
154 "snapshot_id".to_string(),
155 JsonValue::Number(self.snapshot_id as f64),
156 );
157 object.insert(
158 "snapshot_time".to_string(),
159 JsonValue::Number(self.snapshot_time as f64),
160 );
161 object.insert(
162 "base_lsn".to_string(),
163 JsonValue::Number(self.base_lsn as f64),
164 );
165 object.insert(
166 "schema_version".to_string(),
167 JsonValue::Number(self.schema_version as f64),
168 );
169 object.insert(
170 "format_version".to_string(),
171 JsonValue::Number(self.format_version as f64),
172 );
173 if let Some(ref sha) = self.snapshot_sha256 {
174 object.insert(
175 "snapshot_sha256".to_string(),
176 JsonValue::String(sha.clone()),
177 );
178 }
179 JsonValue::Object(object)
180 }
181
182 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
183 Ok(Self {
184 timeline_id: value
185 .get("timeline_id")
186 .and_then(JsonValue::as_str)
187 .unwrap_or("main")
188 .to_string(),
189 snapshot_key: value
190 .get("snapshot_key")
191 .and_then(JsonValue::as_str)
192 .ok_or_else(|| {
193 BackendError::Internal("snapshot manifest missing snapshot_key".to_string())
194 })?
195 .to_string(),
196 snapshot_id: value
197 .get("snapshot_id")
198 .and_then(JsonValue::as_u64)
199 .ok_or_else(|| {
200 BackendError::Internal("snapshot manifest missing snapshot_id".to_string())
201 })?,
202 snapshot_time: value
203 .get("snapshot_time")
204 .and_then(JsonValue::as_u64)
205 .ok_or_else(|| {
206 BackendError::Internal("snapshot manifest missing snapshot_time".to_string())
207 })?,
208 base_lsn: value
209 .get("base_lsn")
210 .and_then(JsonValue::as_u64)
211 .unwrap_or(0),
212 schema_version: value
213 .get("schema_version")
214 .and_then(JsonValue::as_u64)
215 .unwrap_or(crate::api::REDDB_FORMAT_VERSION as u64)
216 as u32,
217 format_version: value
218 .get("format_version")
219 .and_then(JsonValue::as_u64)
220 .unwrap_or(crate::api::REDDB_FORMAT_VERSION as u64)
221 as u32,
222 snapshot_sha256: value
223 .get("snapshot_sha256")
224 .and_then(JsonValue::as_str)
225 .map(|s| s.to_string()),
226 })
227 }
228
229 pub fn compute_snapshot_sha256(snapshot_path: &Path) -> Result<String, BackendError> {
234 sha256_file_hex(snapshot_path)
235 }
236}
237
238pub fn sha256_file_hex(path: &Path) -> Result<String, BackendError> {
242 use std::fs::File;
243 use std::io::Read;
244 let mut hasher = crate::crypto::sha256::Sha256::new();
245 let mut file = File::open(path)
246 .map_err(|err| BackendError::Internal(format!("open file for hash {path:?}: {err}")))?;
247 let mut buf = vec![0u8; 8 * 1024];
248 loop {
249 let n = file
250 .read(&mut buf)
251 .map_err(|err| BackendError::Internal(format!("read file for hash {path:?}: {err}")))?;
252 if n == 0 {
253 break;
254 }
255 hasher.update(&buf[..n]);
256 }
257 Ok(crate::utils::to_hex(&hasher.finalize()))
258}
259
260pub fn sha256_bytes_hex(bytes: &[u8]) -> String {
264 crate::utils::to_hex(&crate::crypto::sha256::sha256(bytes))
265}
266
267pub struct WalArchiver {
269 backend: Arc<dyn RemoteBackend>,
270 prefix: String,
271}
272
273impl WalArchiver {
274 pub fn new(backend: Arc<dyn RemoteBackend>, prefix: impl Into<String>) -> Self {
276 Self {
277 backend,
278 prefix: prefix.into(),
279 }
280 }
281
282 pub fn archive_segment(
289 &self,
290 wal_path: &Path,
291 lsn_start: u64,
292 lsn_end: u64,
293 prev_hash: Option<String>,
294 ) -> Result<WalSegmentMeta, BackendError> {
295 let size_bytes = std::fs::metadata(wal_path).map(|m| m.len()).unwrap_or(0);
296
297 let sha = sha256_file_hex(wal_path).ok();
301
302 let key = format!("{}{:012}-{:012}.wal", self.prefix, lsn_start, lsn_end);
303
304 self.backend.upload(wal_path, &key)?;
305
306 let created_at = SystemTime::now()
307 .duration_since(UNIX_EPOCH)
308 .unwrap_or_default()
309 .as_millis() as u64;
310
311 let meta = WalSegmentMeta {
312 key,
313 lsn_start,
314 lsn_end,
315 created_at,
316 size_bytes,
317 sha256: sha,
318 };
319 if let Err(err) = publish_wal_segment_manifest(
320 self.backend.as_ref(),
321 &WalSegmentManifest::from_meta(&meta, prev_hash),
322 ) {
323 tracing::warn!(
324 target: "reddb::backup",
325 error = %err,
326 segment_key = %meta.key,
327 "wal segment manifest publish failed; segment archived without checksum sidecar"
328 );
329 }
330 Ok(meta)
331 }
332
333 pub fn download_segment(&self, segment_key: &str, dest: &Path) -> Result<bool, BackendError> {
335 self.backend.download(segment_key, dest)
336 }
337
338 pub fn cleanup_before(&self, lsn: u64) -> Result<usize, BackendError> {
341 let keys = self.backend.list(&self.prefix)?;
342 let mut deleted = 0usize;
343 for key in keys {
344 let path = PathBuf::from(&key);
345 let Some(file_name) = path.file_name().and_then(|s| s.to_str()) else {
346 continue;
347 };
348 let Some((start, _end)) = file_name
349 .strip_suffix(".wal")
350 .and_then(|base| base.split_once('-'))
351 else {
352 continue;
353 };
354 let Ok(lsn_start) = start.parse::<u64>() else {
355 continue;
356 };
357 if lsn_start < lsn {
358 self.backend.delete(&key)?;
359 deleted += 1;
360 }
361 }
362 Ok(deleted)
363 }
364
365 pub fn segment_exists(&self, segment_key: &str) -> Result<bool, BackendError> {
367 self.backend.exists(segment_key)
368 }
369
370 pub fn backend_name(&self) -> &str {
372 self.backend.name()
373 }
374}
375
376pub fn archive_snapshot(
378 backend: &dyn RemoteBackend,
379 snapshot_path: &Path,
380 snapshot_id: u64,
381 prefix: &str,
382) -> Result<String, BackendError> {
383 let timestamp = SystemTime::now()
384 .duration_since(UNIX_EPOCH)
385 .unwrap_or_default()
386 .as_millis() as u64;
387
388 let key = format!("{}{:012}-{}.snapshot", prefix, snapshot_id, timestamp);
389
390 backend.upload(snapshot_path, &key)?;
391 Ok(key)
392}
393
394pub fn snapshot_manifest_key(snapshot_key: &str) -> String {
395 format!("{snapshot_key}.manifest.json")
396}
397
398#[derive(Debug, Clone, PartialEq, Eq)]
406pub struct WalSegmentManifest {
407 pub key: String,
408 pub lsn_start: u64,
409 pub lsn_end: u64,
410 pub size_bytes: u64,
411 pub created_at: u64,
412 pub sha256: Option<String>,
414 pub prev_hash: Option<String>,
420}
421
422impl WalSegmentManifest {
423 pub fn from_meta(meta: &WalSegmentMeta, prev_hash: Option<String>) -> Self {
424 Self {
425 key: meta.key.clone(),
426 lsn_start: meta.lsn_start,
427 lsn_end: meta.lsn_end,
428 size_bytes: meta.size_bytes,
429 created_at: meta.created_at,
430 sha256: meta.sha256.clone(),
431 prev_hash,
432 }
433 }
434
435 pub fn to_json_value(&self) -> JsonValue {
436 let mut object = Map::new();
437 object.insert("key".to_string(), JsonValue::String(self.key.clone()));
438 object.insert(
439 "lsn_start".to_string(),
440 JsonValue::Number(self.lsn_start as f64),
441 );
442 object.insert(
443 "lsn_end".to_string(),
444 JsonValue::Number(self.lsn_end as f64),
445 );
446 object.insert(
447 "size_bytes".to_string(),
448 JsonValue::Number(self.size_bytes as f64),
449 );
450 object.insert(
451 "created_at".to_string(),
452 JsonValue::Number(self.created_at as f64),
453 );
454 if let Some(sha) = &self.sha256 {
455 object.insert("sha256".to_string(), JsonValue::String(sha.clone()));
456 }
457 if let Some(prev) = &self.prev_hash {
458 object.insert("prev_hash".to_string(), JsonValue::String(prev.clone()));
459 }
460 JsonValue::Object(object)
461 }
462
463 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
464 Ok(Self {
465 key: value
466 .get("key")
467 .and_then(JsonValue::as_str)
468 .ok_or_else(|| {
469 BackendError::Internal("wal segment manifest missing key".to_string())
470 })?
471 .to_string(),
472 lsn_start: value
473 .get("lsn_start")
474 .and_then(JsonValue::as_u64)
475 .unwrap_or(0),
476 lsn_end: value
477 .get("lsn_end")
478 .and_then(JsonValue::as_u64)
479 .unwrap_or(0),
480 size_bytes: value
481 .get("size_bytes")
482 .and_then(JsonValue::as_u64)
483 .unwrap_or(0),
484 created_at: value
485 .get("created_at")
486 .and_then(JsonValue::as_u64)
487 .unwrap_or(0),
488 sha256: value
489 .get("sha256")
490 .and_then(JsonValue::as_str)
491 .map(|s| s.to_string()),
492 prev_hash: value
493 .get("prev_hash")
494 .and_then(JsonValue::as_str)
495 .map(|s| s.to_string()),
496 })
497 }
498}
499
500pub fn wal_segment_manifest_key(segment_key: &str) -> String {
501 format!("{segment_key}.manifest.json")
502}
503
504#[derive(Debug, Clone, PartialEq, Eq)]
515pub struct UnifiedManifest {
516 pub version: String,
518 pub engine_version: String,
520 pub latest_lsn: u64,
523 pub snapshots: Vec<UnifiedSnapshotEntry>,
525 pub wal_segments: Vec<UnifiedWalEntry>,
527}
528
529#[derive(Debug, Clone, PartialEq, Eq)]
530pub struct UnifiedSnapshotEntry {
531 pub id: u64,
532 pub lsn: u64,
533 pub ts: u64,
534 pub bytes: u64,
535 pub key: String,
536 pub checksum: Option<String>,
537}
538
539#[derive(Debug, Clone, PartialEq, Eq)]
540pub struct UnifiedWalEntry {
541 pub lsn_start: u64,
542 pub lsn_end: u64,
543 pub key: String,
544 pub bytes: u64,
545 pub checksum: Option<String>,
546 pub prev_hash: Option<String>,
551}
552
553impl UnifiedManifest {
554 pub const VERSION: &'static str = "1.0";
555
556 pub fn new(snapshots: Vec<UnifiedSnapshotEntry>, wal_segments: Vec<UnifiedWalEntry>) -> Self {
557 let latest_lsn = wal_segments
558 .iter()
559 .map(|w| w.lsn_end)
560 .chain(snapshots.iter().map(|s| s.lsn))
561 .max()
562 .unwrap_or(0);
563 Self {
564 version: Self::VERSION.to_string(),
565 engine_version: env!("CARGO_PKG_VERSION").to_string(),
566 latest_lsn,
567 snapshots,
568 wal_segments,
569 }
570 }
571
572 pub fn to_json_value(&self) -> JsonValue {
573 let mut obj = Map::new();
574 obj.insert(
575 "version".to_string(),
576 JsonValue::String(self.version.clone()),
577 );
578 obj.insert(
579 "engine_version".to_string(),
580 JsonValue::String(self.engine_version.clone()),
581 );
582 obj.insert(
583 "latest_lsn".to_string(),
584 JsonValue::Number(self.latest_lsn as f64),
585 );
586 obj.insert(
587 "snapshots".to_string(),
588 JsonValue::Array(
589 self.snapshots
590 .iter()
591 .map(UnifiedSnapshotEntry::to_json_value)
592 .collect(),
593 ),
594 );
595 obj.insert(
596 "wal_segments".to_string(),
597 JsonValue::Array(
598 self.wal_segments
599 .iter()
600 .map(UnifiedWalEntry::to_json_value)
601 .collect(),
602 ),
603 );
604 JsonValue::Object(obj)
605 }
606
607 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
608 let obj = value.as_object().ok_or_else(|| {
609 BackendError::Internal("unified manifest must be a JSON object".to_string())
610 })?;
611 Ok(Self {
612 version: obj
613 .get("version")
614 .and_then(JsonValue::as_str)
615 .unwrap_or("1.0")
616 .to_string(),
617 engine_version: obj
618 .get("engine_version")
619 .and_then(JsonValue::as_str)
620 .unwrap_or("unknown")
621 .to_string(),
622 latest_lsn: obj
623 .get("latest_lsn")
624 .and_then(JsonValue::as_u64)
625 .unwrap_or(0),
626 snapshots: obj
627 .get("snapshots")
628 .and_then(JsonValue::as_array)
629 .map(|arr| {
630 arr.iter()
631 .filter_map(|v| UnifiedSnapshotEntry::from_json_value(v).ok())
632 .collect()
633 })
634 .unwrap_or_default(),
635 wal_segments: obj
636 .get("wal_segments")
637 .and_then(JsonValue::as_array)
638 .map(|arr| {
639 arr.iter()
640 .filter_map(|v| UnifiedWalEntry::from_json_value(v).ok())
641 .collect()
642 })
643 .unwrap_or_default(),
644 })
645 }
646}
647
648impl UnifiedSnapshotEntry {
649 pub fn to_json_value(&self) -> JsonValue {
650 let mut obj = Map::new();
651 obj.insert("id".to_string(), JsonValue::Number(self.id as f64));
652 obj.insert("lsn".to_string(), JsonValue::Number(self.lsn as f64));
653 obj.insert("ts".to_string(), JsonValue::Number(self.ts as f64));
654 obj.insert("bytes".to_string(), JsonValue::Number(self.bytes as f64));
655 obj.insert("key".to_string(), JsonValue::String(self.key.clone()));
656 if let Some(c) = &self.checksum {
657 obj.insert(
658 "checksum".to_string(),
659 JsonValue::String(format!("sha256:{c}")),
660 );
661 }
662 JsonValue::Object(obj)
663 }
664
665 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
666 let obj = value.as_object().ok_or_else(|| {
667 BackendError::Internal("snapshot entry must be a JSON object".to_string())
668 })?;
669 Ok(Self {
670 id: obj.get("id").and_then(JsonValue::as_u64).unwrap_or(0),
671 lsn: obj.get("lsn").and_then(JsonValue::as_u64).unwrap_or(0),
672 ts: obj.get("ts").and_then(JsonValue::as_u64).unwrap_or(0),
673 bytes: obj.get("bytes").and_then(JsonValue::as_u64).unwrap_or(0),
674 key: obj
675 .get("key")
676 .and_then(JsonValue::as_str)
677 .ok_or_else(|| BackendError::Internal("snapshot entry missing key".to_string()))?
678 .to_string(),
679 checksum: obj
680 .get("checksum")
681 .and_then(JsonValue::as_str)
682 .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
683 })
684 }
685}
686
687impl UnifiedWalEntry {
688 pub fn to_json_value(&self) -> JsonValue {
689 let mut obj = Map::new();
690 obj.insert(
691 "lsn_start".to_string(),
692 JsonValue::Number(self.lsn_start as f64),
693 );
694 obj.insert(
695 "lsn_end".to_string(),
696 JsonValue::Number(self.lsn_end as f64),
697 );
698 obj.insert("key".to_string(), JsonValue::String(self.key.clone()));
699 obj.insert("bytes".to_string(), JsonValue::Number(self.bytes as f64));
700 if let Some(c) = &self.checksum {
701 obj.insert(
702 "checksum".to_string(),
703 JsonValue::String(format!("sha256:{c}")),
704 );
705 }
706 if let Some(p) = &self.prev_hash {
707 obj.insert(
708 "prev_hash".to_string(),
709 JsonValue::String(format!("sha256:{p}")),
710 );
711 }
712 JsonValue::Object(obj)
713 }
714
715 pub fn from_json_value(value: &JsonValue) -> Result<Self, BackendError> {
716 let obj = value.as_object().ok_or_else(|| {
717 BackendError::Internal("wal segment entry must be a JSON object".to_string())
718 })?;
719 Ok(Self {
720 lsn_start: obj
721 .get("lsn_start")
722 .and_then(JsonValue::as_u64)
723 .unwrap_or(0),
724 lsn_end: obj.get("lsn_end").and_then(JsonValue::as_u64).unwrap_or(0),
725 key: obj
726 .get("key")
727 .and_then(JsonValue::as_str)
728 .ok_or_else(|| BackendError::Internal("wal segment entry missing key".to_string()))?
729 .to_string(),
730 bytes: obj.get("bytes").and_then(JsonValue::as_u64).unwrap_or(0),
731 checksum: obj
732 .get("checksum")
733 .and_then(JsonValue::as_str)
734 .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
735 prev_hash: obj
736 .get("prev_hash")
737 .and_then(JsonValue::as_str)
738 .map(|s| s.strip_prefix("sha256:").unwrap_or(s).to_string()),
739 })
740 }
741}
742
743pub fn unified_manifest_key(prefix: &str) -> String {
744 let trimmed = prefix.trim_end_matches('/');
745 if trimmed.is_empty() {
746 "MANIFEST.json".to_string()
747 } else {
748 format!("{trimmed}/MANIFEST.json")
749 }
750}
751
752pub fn publish_unified_manifest(
761 backend: &dyn RemoteBackend,
762 prefix: &str,
763 manifest: &UnifiedManifest,
764) -> Result<String, BackendError> {
765 let key = unified_manifest_key(prefix);
766 write_json_object(backend, &key, &manifest.to_json_value())?;
767 Ok(key)
768}
769
770pub fn load_unified_manifest(
771 backend: &dyn RemoteBackend,
772 prefix: &str,
773) -> Result<Option<UnifiedManifest>, BackendError> {
774 let key = unified_manifest_key(prefix);
775 let Some(value) = read_json_object(backend, &key)? else {
776 return Ok(None);
777 };
778 Ok(Some(UnifiedManifest::from_json_value(&value)?))
779}
780
781pub fn publish_unified_manifest_for_prefix(
792 backend: &dyn RemoteBackend,
793 snapshot_prefix: &str,
794) -> Result<String, BackendError> {
795 let root = derive_backup_root(snapshot_prefix);
796 let snapshots = collect_unified_snapshots(backend, snapshot_prefix)?;
797 let wal_root = format!("{}wal/", root);
798 let wal_segments = collect_unified_wal_segments(backend, &wal_root)?;
799 let manifest = UnifiedManifest::new(snapshots, wal_segments);
800 publish_unified_manifest(backend, &root, &manifest)
801}
802
803fn derive_backup_root(snapshot_prefix: &str) -> String {
804 let trimmed = snapshot_prefix.trim_end_matches('/');
808 if let Some(idx) = trimmed.rfind("/snapshots") {
809 let (head, _) = trimmed.split_at(idx);
810 if head.is_empty() {
811 String::new()
812 } else {
813 format!("{head}/")
814 }
815 } else if trimmed == "snapshots" || trimmed.is_empty() {
816 String::new()
817 } else {
818 format!("{trimmed}/")
820 }
821}
822
823fn collect_unified_snapshots(
824 backend: &dyn RemoteBackend,
825 snapshot_prefix: &str,
826) -> Result<Vec<UnifiedSnapshotEntry>, BackendError> {
827 let keys = backend.list(snapshot_prefix)?;
828 let mut out = Vec::new();
829 for key in keys {
830 if key.ends_with(".manifest.json") {
833 continue;
834 }
835 let Some(manifest) = load_snapshot_manifest(backend, &key)? else {
836 continue;
837 };
838 out.push(UnifiedSnapshotEntry {
839 id: manifest.snapshot_id,
840 lsn: manifest.base_lsn,
841 ts: manifest.snapshot_time,
842 bytes: 0,
843 key: manifest.snapshot_key.clone(),
844 checksum: manifest.snapshot_sha256.clone(),
845 });
846 }
847 out.sort_by_key(|s| std::cmp::Reverse(s.ts));
848 Ok(out)
849}
850
851fn collect_unified_wal_segments(
852 backend: &dyn RemoteBackend,
853 wal_prefix: &str,
854) -> Result<Vec<UnifiedWalEntry>, BackendError> {
855 let keys = backend.list(wal_prefix)?;
856 let mut out = Vec::new();
857 for key in keys {
858 if key.ends_with(".manifest.json") {
859 continue;
860 }
861 if !key.ends_with(".wal") {
862 continue;
863 }
864 let Some(manifest) = load_wal_segment_manifest(backend, &key)? else {
865 continue;
866 };
867 out.push(UnifiedWalEntry {
868 lsn_start: manifest.lsn_start,
869 lsn_end: manifest.lsn_end,
870 key: manifest.key.clone(),
871 bytes: manifest.size_bytes,
872 checksum: manifest.sha256.clone(),
873 prev_hash: manifest.prev_hash.clone(),
874 });
875 }
876 out.sort_by_key(|w| w.lsn_start);
877 Ok(out)
878}
879
880pub fn publish_wal_segment_manifest(
881 backend: &dyn RemoteBackend,
882 manifest: &WalSegmentManifest,
883) -> Result<String, BackendError> {
884 let key = wal_segment_manifest_key(&manifest.key);
885 write_json_object(backend, &key, &manifest.to_json_value())?;
886 Ok(key)
887}
888
889pub fn load_wal_segment_manifest(
890 backend: &dyn RemoteBackend,
891 segment_key: &str,
892) -> Result<Option<WalSegmentManifest>, BackendError> {
893 let key = wal_segment_manifest_key(segment_key);
894 let Some(value) = read_json_object(backend, &key)? else {
895 return Ok(None);
896 };
897 Ok(Some(WalSegmentManifest::from_json_value(&value)?))
898}
899
900pub fn publish_backup_head(
901 backend: &dyn RemoteBackend,
902 head_key: &str,
903 head: &BackupHead,
904) -> Result<(), BackendError> {
905 write_json_object(backend, head_key, &head.to_json_value())
906}
907
908pub fn load_backup_head(
909 backend: &dyn RemoteBackend,
910 head_key: &str,
911) -> Result<Option<BackupHead>, BackendError> {
912 let Some(value) = read_json_object(backend, head_key)? else {
913 return Ok(None);
914 };
915 Ok(Some(BackupHead::from_json_value(&value)?))
916}
917
918pub fn publish_snapshot_manifest(
919 backend: &dyn RemoteBackend,
920 manifest: &SnapshotManifest,
921) -> Result<String, BackendError> {
922 let key = snapshot_manifest_key(&manifest.snapshot_key);
923 write_json_object(backend, &key, &manifest.to_json_value())?;
924 Ok(key)
925}
926
927pub fn load_snapshot_manifest(
928 backend: &dyn RemoteBackend,
929 snapshot_key: &str,
930) -> Result<Option<SnapshotManifest>, BackendError> {
931 let key = snapshot_manifest_key(snapshot_key);
932 let Some(value) = read_json_object(backend, &key)? else {
933 return Ok(None);
934 };
935 Ok(Some(SnapshotManifest::from_json_value(&value)?))
936}
937
938pub fn archive_change_records(
939 backend: &dyn RemoteBackend,
940 prefix: &str,
941 records: &[(u64, Vec<u8>)],
942 prev_hash: Option<String>,
943) -> Result<Option<WalSegmentMeta>, BackendError> {
944 let Some((lsn_start, _)) = records.first() else {
945 return Ok(None);
946 };
947 let Some((lsn_end, _)) = records.last() else {
948 return Ok(None);
949 };
950
951 let payload = JsonValue::Array(
952 records
953 .iter()
954 .map(|(lsn, bytes)| {
955 let mut object = Map::new();
956 object.insert("lsn".to_string(), JsonValue::Number(*lsn as f64));
957 object.insert("data".to_string(), JsonValue::String(hex::encode(bytes)));
958 JsonValue::Object(object)
959 })
960 .collect(),
961 );
962 let body = crate::json::to_vec(&payload).map_err(|err| {
963 BackendError::Internal(format!("encode archived logical wal failed: {err}"))
964 })?;
965 let sha = sha256_bytes_hex(&body);
968
969 let temp = temp_json_path(
970 "reddb-archived-change-records",
971 Some(*lsn_start),
972 Some(*lsn_end),
973 );
974 std::fs::write(&temp, &body)
975 .map_err(|err| BackendError::Transport(format!("write temp logical wal failed: {err}")))?;
976
977 let key = format!("{}{:012}-{:012}.wal", prefix, lsn_start, lsn_end);
978 backend.upload(&temp, &key)?;
979 let size_bytes = std::fs::metadata(&temp).map(|meta| meta.len()).unwrap_or(0);
980 let _ = std::fs::remove_file(&temp);
981
982 let meta = WalSegmentMeta {
983 key,
984 lsn_start: *lsn_start,
985 lsn_end: *lsn_end,
986 created_at: SystemTime::now()
987 .duration_since(UNIX_EPOCH)
988 .unwrap_or_default()
989 .as_millis() as u64,
990 size_bytes,
991 sha256: Some(sha),
992 };
993
994 if let Err(err) =
999 publish_wal_segment_manifest(backend, &WalSegmentManifest::from_meta(&meta, prev_hash))
1000 {
1001 tracing::warn!(
1002 target: "reddb::backup",
1003 error = %err,
1004 segment_key = %meta.key,
1005 "wal segment manifest publish failed; segment archived without checksum sidecar"
1006 );
1007 }
1008
1009 Ok(Some(meta))
1010}
1011
1012pub fn load_archived_change_records(
1013 backend: &dyn RemoteBackend,
1014 segment_key: &str,
1015) -> Result<Vec<ChangeRecord>, BackendError> {
1016 let (records, _digest) = load_archived_change_records_with_sha256(backend, segment_key)?;
1017 Ok(records)
1018}
1019
1020pub fn load_archived_change_records_with_sha256(
1025 backend: &dyn RemoteBackend,
1026 segment_key: &str,
1027) -> Result<(Vec<ChangeRecord>, Option<String>), BackendError> {
1028 let temp = temp_json_path("reddb-archived-change-records-read", None, None);
1029 let found = backend.download(segment_key, &temp)?;
1030 if !found {
1031 let _ = std::fs::remove_file(&temp);
1032 return Ok((Vec::new(), None));
1033 }
1034 let bytes = std::fs::read(&temp)
1035 .map_err(|err| BackendError::Transport(format!("read temp logical wal failed: {err}")))?;
1036 let _ = std::fs::remove_file(&temp);
1037 let digest = sha256_bytes_hex(&bytes);
1038
1039 let value: JsonValue = crate::json::from_slice(&bytes).map_err(|err| {
1040 BackendError::Internal(format!("decode archived logical wal failed: {err}"))
1041 })?;
1042 let Some(entries) = value.as_array() else {
1043 return Err(BackendError::Internal(
1044 "archived logical wal must be a JSON array".to_string(),
1045 ));
1046 };
1047 let mut out = Vec::new();
1048 for entry in entries {
1049 let Some(data_hex) = entry.get("data").and_then(JsonValue::as_str) else {
1050 continue;
1051 };
1052 let data = hex::decode(data_hex).map_err(|err| {
1053 BackendError::Internal(format!("decode wal record hex failed: {err}"))
1054 })?;
1055 let record = ChangeRecord::decode(&data)
1056 .map_err(|err| BackendError::Internal(format!("decode wal record failed: {err}")))?;
1057 out.push(record);
1058 }
1059 Ok((out, Some(digest)))
1060}
1061
1062fn write_json_object(
1063 backend: &dyn RemoteBackend,
1064 key: &str,
1065 value: &JsonValue,
1066) -> Result<(), BackendError> {
1067 let temp = temp_json_path("reddb-json-object", None, None);
1068 std::fs::write(
1069 &temp,
1070 crate::json::to_vec(value)
1071 .map_err(|err| BackendError::Internal(format!("encode json object failed: {err}")))?,
1072 )
1073 .map_err(|err| BackendError::Transport(format!("write temp json object failed: {err}")))?;
1074 let upload_result = backend.upload(&temp, key);
1075 let _ = std::fs::remove_file(&temp);
1076 upload_result
1077}
1078
1079fn read_json_object(
1080 backend: &dyn RemoteBackend,
1081 key: &str,
1082) -> Result<Option<JsonValue>, BackendError> {
1083 let temp = temp_json_path("reddb-json-object-read", None, None);
1084 let found = backend.download(key, &temp)?;
1085 if !found {
1086 return Ok(None);
1087 }
1088 let bytes = std::fs::read(&temp)
1089 .map_err(|err| BackendError::Transport(format!("read temp json object failed: {err}")))?;
1090 let _ = std::fs::remove_file(&temp);
1091 let value = crate::json::from_slice::<JsonValue>(&bytes)
1092 .map_err(|err| BackendError::Internal(format!("decode json object failed: {err}")))?;
1093 Ok(Some(value))
1094}
1095
1096fn temp_json_path(prefix: &str, start: Option<u64>, end: Option<u64>) -> PathBuf {
1097 let suffix = match (start, end) {
1098 (Some(start), Some(end)) => format!("-{start}-{end}"),
1099 _ => String::new(),
1100 };
1101 std::env::temp_dir().join(format!(
1102 "{prefix}-{}{}-{}.json",
1103 std::process::id(),
1104 suffix,
1105 SystemTime::now()
1106 .duration_since(UNIX_EPOCH)
1107 .unwrap_or_default()
1108 .as_nanos()
1109 ))
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use super::*;
1115 use crate::storage::backend::local::LocalBackend;
1116 use std::io::Write;
1117
1118 #[test]
1119 fn test_archive_and_download() {
1120 let temp_dir = std::env::temp_dir().join("reddb_archiver_test");
1121 let _ = std::fs::create_dir_all(&temp_dir);
1122 let backend_dir = temp_dir.join("backend");
1123 let _ = std::fs::create_dir_all(&backend_dir);
1124
1125 let backend = Arc::new(LocalBackend);
1126 let archiver = WalArchiver::new(backend, "wal/");
1127
1128 let wal_path = temp_dir.join("test.wal");
1130 {
1131 let mut f = std::fs::File::create(&wal_path).unwrap();
1132 f.write_all(b"fake wal data").unwrap();
1133 }
1134
1135 let meta = archiver.archive_segment(&wal_path, 8, 500, None).unwrap();
1137 assert_eq!(meta.lsn_start, 8);
1138 assert_eq!(meta.lsn_end, 500);
1139 assert!(meta.key.starts_with("wal/"));
1140 assert!(meta.key.ends_with(".wal"));
1141
1142 let dest = temp_dir.join("downloaded.wal");
1144 let found = archiver.download_segment(&meta.key, &dest).unwrap();
1145 assert!(found);
1146 assert!(dest.exists());
1147
1148 let _ = std::fs::remove_dir_all(&temp_dir);
1150 }
1151
1152 #[test]
1153 fn test_backup_head_roundtrip() {
1154 let temp_dir = std::env::temp_dir().join("reddb_backup_head_test");
1155 let _ = std::fs::create_dir_all(&temp_dir);
1156 let backend = LocalBackend;
1157 let head_key = temp_dir.join("manifests").join("head.json");
1158
1159 let head = BackupHead {
1160 timeline_id: "main".to_string(),
1161 snapshot_key: "snapshots/000001-123.snapshot".to_string(),
1162 snapshot_id: 1,
1163 snapshot_time: 123,
1164 current_lsn: 456,
1165 last_archived_lsn: 456,
1166 wal_prefix: "wal/".to_string(),
1167 };
1168
1169 publish_backup_head(&backend, &head_key.to_string_lossy(), &head).unwrap();
1170 let loaded = load_backup_head(&backend, &head_key.to_string_lossy())
1171 .unwrap()
1172 .expect("head");
1173 assert_eq!(loaded, head);
1174
1175 let _ = std::fs::remove_dir_all(&temp_dir);
1176 }
1177
1178 #[test]
1179 fn test_snapshot_manifest_roundtrip() {
1180 let temp_dir = std::env::temp_dir().join("reddb_snapshot_manifest_test");
1181 let _ = std::fs::create_dir_all(&temp_dir);
1182 let backend = LocalBackend;
1183 let manifest = SnapshotManifest {
1184 timeline_id: "main".to_string(),
1185 snapshot_key: temp_dir
1186 .join("snapshots")
1187 .join("000001-123.snapshot")
1188 .to_string_lossy()
1189 .to_string(),
1190 snapshot_id: 1,
1191 snapshot_time: 123,
1192 base_lsn: 456,
1193 schema_version: crate::api::REDDB_FORMAT_VERSION,
1194 format_version: crate::api::REDDB_FORMAT_VERSION,
1195 snapshot_sha256: None,
1196 };
1197
1198 publish_snapshot_manifest(&backend, &manifest).unwrap();
1199 let loaded = load_snapshot_manifest(&backend, &manifest.snapshot_key)
1200 .unwrap()
1201 .expect("manifest");
1202 assert_eq!(loaded, manifest);
1203
1204 let _ = std::fs::remove_dir_all(&temp_dir);
1205 }
1206
1207 #[test]
1208 fn test_archived_change_records_roundtrip() {
1209 let temp_dir = std::env::temp_dir().join("reddb_archived_change_records_test");
1210 let _ = std::fs::remove_dir_all(&temp_dir);
1211 std::fs::create_dir_all(&temp_dir).unwrap();
1212 let backend = LocalBackend;
1213 let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1214 let record = ChangeRecord {
1215 lsn: 7,
1216 timestamp: 1234,
1217 operation: crate::replication::cdc::ChangeOperation::Insert,
1218 collection: "users".to_string(),
1219 entity_id: 42,
1220 entity_kind: "row".to_string(),
1221 entity_bytes: Some(vec![1, 2, 3]),
1222 metadata: None,
1223 };
1224
1225 let meta =
1226 archive_change_records(&backend, &prefix, &[(record.lsn, record.encode())], None)
1227 .unwrap()
1228 .expect("meta");
1229 let loaded = load_archived_change_records(&backend, &meta.key).unwrap();
1230 assert_eq!(loaded.len(), 1);
1231 assert_eq!(loaded[0].lsn, 7);
1232
1233 let _ = std::fs::remove_dir_all(&temp_dir);
1234 }
1235
1236 #[test]
1237 fn archive_change_records_writes_sidecar_with_sha256() {
1238 let temp_dir =
1239 std::env::temp_dir().join(format!("reddb_archiver_sidecar_{}", std::process::id()));
1240 let _ = std::fs::remove_dir_all(&temp_dir);
1241 std::fs::create_dir_all(&temp_dir).unwrap();
1242 let backend = LocalBackend;
1243 let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1244 let record = ChangeRecord {
1245 lsn: 11,
1246 timestamp: 99,
1247 operation: crate::replication::cdc::ChangeOperation::Insert,
1248 collection: "users".to_string(),
1249 entity_id: 1,
1250 entity_kind: "row".to_string(),
1251 entity_bytes: Some(b"x".to_vec()),
1252 metadata: None,
1253 };
1254 let meta =
1255 archive_change_records(&backend, &prefix, &[(record.lsn, record.encode())], None)
1256 .unwrap()
1257 .expect("meta");
1258 assert!(meta.sha256.is_some(), "WalSegmentMeta should carry sha256");
1259
1260 let sidecar = load_wal_segment_manifest(&backend, &meta.key)
1261 .unwrap()
1262 .expect("sidecar");
1263 assert_eq!(sidecar.key, meta.key);
1264 assert_eq!(sidecar.lsn_start, meta.lsn_start);
1265 assert_eq!(sidecar.lsn_end, meta.lsn_end);
1266 assert_eq!(sidecar.sha256, meta.sha256);
1267
1268 let (_records, computed) =
1269 load_archived_change_records_with_sha256(&backend, &meta.key).unwrap();
1270 assert_eq!(computed, meta.sha256, "computed sha must match sidecar");
1271
1272 let _ = std::fs::remove_dir_all(&temp_dir);
1273 }
1274
1275 #[test]
1276 fn unified_manifest_json_roundtrip() {
1277 let manifest = UnifiedManifest::new(
1278 vec![UnifiedSnapshotEntry {
1279 id: 7,
1280 lsn: 100,
1281 ts: 1730000000000,
1282 bytes: 4096,
1283 key: "snapshots/000007-1730000000000.snapshot".to_string(),
1284 checksum: Some("9f8b".to_string()),
1285 }],
1286 vec![UnifiedWalEntry {
1287 lsn_start: 100,
1288 lsn_end: 250,
1289 key: "wal/000000000100-000000000250.wal".to_string(),
1290 bytes: 1024,
1291 checksum: Some("c1d2".to_string()),
1292 prev_hash: Some("9f8b".to_string()),
1293 }],
1294 );
1295
1296 let json = manifest.to_json_value();
1297 let parsed = UnifiedManifest::from_json_value(&json).unwrap();
1298 assert_eq!(parsed, manifest);
1299 assert_eq!(parsed.latest_lsn, 250);
1300
1301 assert_eq!(parsed.wal_segments[0].prev_hash.as_deref(), Some("9f8b"));
1305 let wal_wire = parsed.wal_segments[0].to_json_value().to_string_compact();
1306 assert!(
1307 wal_wire.contains("\"prev_hash\":\"sha256:9f8b\""),
1308 "wire form must include sha256: prefix on prev_hash; got: {wal_wire}"
1309 );
1310
1311 let body = json.to_string_compact();
1314 assert!(
1315 body.contains("\"sha256:9f8b\""),
1316 "wire form must include sha256: prefix"
1317 );
1318 assert_eq!(parsed.snapshots[0].checksum.as_deref(), Some("9f8b"));
1319 }
1320
1321 #[test]
1322 fn unified_manifest_publish_load_roundtrip() {
1323 let temp_dir =
1324 std::env::temp_dir().join(format!("reddb_unified_manifest_{}", std::process::id()));
1325 let _ = std::fs::remove_dir_all(&temp_dir);
1326 std::fs::create_dir_all(&temp_dir).unwrap();
1327 let prefix = temp_dir.to_string_lossy().to_string();
1328
1329 let backend = LocalBackend;
1330 let manifest = UnifiedManifest::new(vec![], vec![]);
1331 publish_unified_manifest(&backend, &prefix, &manifest).unwrap();
1332 let loaded = load_unified_manifest(&backend, &prefix)
1333 .unwrap()
1334 .expect("manifest");
1335 assert_eq!(loaded.version, "1.0");
1336 assert_eq!(loaded.latest_lsn, 0);
1337
1338 let _ = std::fs::remove_dir_all(&temp_dir);
1339 }
1340
1341 #[test]
1342 fn archive_change_records_chains_prev_hash() {
1343 let temp_dir =
1344 std::env::temp_dir().join(format!("reddb_archive_chain_{}", std::process::id()));
1345 let _ = std::fs::remove_dir_all(&temp_dir);
1346 std::fs::create_dir_all(&temp_dir).unwrap();
1347 let backend = LocalBackend;
1348 let prefix = format!("{}/wal/", temp_dir.to_string_lossy());
1349
1350 let mk = |lsn: u64| ChangeRecord {
1351 lsn,
1352 timestamp: lsn * 1000,
1353 operation: crate::replication::cdc::ChangeOperation::Insert,
1354 collection: "users".to_string(),
1355 entity_id: lsn,
1356 entity_kind: "row".to_string(),
1357 entity_bytes: Some(format!("payload-{lsn}").into_bytes()),
1358 metadata: None,
1359 };
1360
1361 let r1 = mk(10);
1362 let m1 = archive_change_records(&backend, &prefix, &[(r1.lsn, r1.encode())], None)
1363 .unwrap()
1364 .expect("seg 1");
1365 let r2 = mk(11);
1366 let m2 = archive_change_records(
1367 &backend,
1368 &prefix,
1369 &[(r2.lsn, r2.encode())],
1370 m1.sha256.clone(),
1371 )
1372 .unwrap()
1373 .expect("seg 2");
1374 let r3 = mk(12);
1375 let m3 = archive_change_records(
1376 &backend,
1377 &prefix,
1378 &[(r3.lsn, r3.encode())],
1379 m2.sha256.clone(),
1380 )
1381 .unwrap()
1382 .expect("seg 3");
1383
1384 let s1 = load_wal_segment_manifest(&backend, &m1.key)
1385 .unwrap()
1386 .unwrap();
1387 let s2 = load_wal_segment_manifest(&backend, &m2.key)
1388 .unwrap()
1389 .unwrap();
1390 let s3 = load_wal_segment_manifest(&backend, &m3.key)
1391 .unwrap()
1392 .unwrap();
1393 assert!(s1.prev_hash.is_none(), "first segment has no prev_hash");
1394 assert_eq!(s2.prev_hash, m1.sha256, "seg 2 links to seg 1 sha256");
1395 assert_eq!(s3.prev_hash, m2.sha256, "seg 3 links to seg 2 sha256");
1396
1397 let _ = std::fs::remove_dir_all(&temp_dir);
1398 }
1399
1400 #[test]
1401 fn wal_segment_manifest_carries_prev_hash_through_json() {
1402 let m = WalSegmentManifest {
1403 key: "wal/000000000010-000000000010.wal".to_string(),
1404 lsn_start: 10,
1405 lsn_end: 10,
1406 size_bytes: 128,
1407 created_at: 1730000000000,
1408 sha256: Some("abc".to_string()),
1409 prev_hash: Some("def".to_string()),
1410 };
1411 let parsed = WalSegmentManifest::from_json_value(&m.to_json_value()).unwrap();
1412 assert_eq!(parsed, m);
1413 }
1414
1415 #[test]
1416 fn derive_backup_root_handles_typical_prefixes() {
1417 assert_eq!(derive_backup_root(""), "");
1418 assert_eq!(derive_backup_root("snapshots/"), "");
1419 assert_eq!(derive_backup_root("snapshots"), "");
1420 assert_eq!(
1421 derive_backup_root("clusters/dev/snapshots/"),
1422 "clusters/dev/"
1423 );
1424 assert_eq!(
1425 derive_backup_root("clusters/dev/snapshots"),
1426 "clusters/dev/"
1427 );
1428 assert_eq!(derive_backup_root("clusters/dev/"), "clusters/dev/");
1429 }
1430}