reddb_server/runtime/impl_backup.rs
1use super::*;
2
3impl RedDBRuntime {
4 /// Get backup scheduler status.
5 pub fn backup_status(&self) -> crate::replication::scheduler::BackupStatus {
6 self.inner.backup_scheduler.status()
7 }
8
9 /// Borrow the runtime's result Blob Cache.
10 ///
11 /// Wired for the `/admin/blob_cache/sweep` and
12 /// `/admin/blob_cache/flush_namespace` HTTP handlers (issue #148
13 /// follow-up): both delegate to
14 /// `crate::storage::cache::sweeper::BlobCacheSweeper`, which takes a
15 /// `&BlobCache`. Also used by `trigger_backup` when
16 /// `red.config.backup.include_blob_cache=true` to locate the L2
17 /// directory for archival.
18 pub fn result_blob_cache(&self) -> &crate::storage::cache::BlobCache {
19 &self.inner.result_blob_cache
20 }
21
22 /// Current local LSN paired with the LSN of the most recently
23 /// archived WAL segment. The difference is the replication /
24 /// archive lag operators alert on (PLAN.md Phase 5.1). Returns
25 /// `(0, 0)` when neither replication nor archiving is configured.
26 pub fn wal_archive_progress(&self) -> (u64, u64) {
27 let current_lsn = self
28 .inner
29 .db
30 .replication
31 .as_ref()
32 .map(|repl| {
33 repl.logical_wal_spool
34 .as_ref()
35 .map(|spool| spool.current_lsn())
36 .unwrap_or_else(|| repl.wal_buffer.current_lsn())
37 })
38 .unwrap_or_else(|| self.inner.cdc.current_lsn());
39 let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
40 (current_lsn, last_archived_lsn)
41 }
42
43 /// Trigger an immediate backup.
44 pub fn trigger_backup(&self) -> RedDBResult<crate::replication::scheduler::BackupResult> {
45 let result = (|| {
46 self.check_write(crate::runtime::write_gate::WriteKind::Backup)?;
47 // Defense in depth — check_write above already rejects when
48 // the lease is NotHeld, but log + audit the lease angle here
49 // explicitly so dashboards distinguish "lease lost" from a
50 // generic read-only refusal.
51 self.assert_remote_write_allowed("admin/backup")?;
52 let started = std::time::Instant::now();
53 let snapshot = self.create_snapshot()?;
54 let mut uploaded = false;
55
56 if let (Some(backend), Some(path)) =
57 (&self.inner.db.remote_backend, self.inner.db.path())
58 {
59 let default_snapshot_prefix = self.inner.db.options().default_snapshot_prefix();
60 let default_wal_prefix = self.inner.db.options().default_wal_archive_prefix();
61 let default_head_key = self.inner.db.options().default_backup_head_key();
62 let snapshot_prefix = self.config_string(
63 "red.config.backup.snapshot_prefix",
64 &default_snapshot_prefix,
65 );
66 let wal_prefix =
67 self.config_string("red.config.wal.archive.prefix", &default_wal_prefix);
68 let head_key = self.config_string("red.config.backup.head_key", &default_head_key);
69 let timeline_id = self.config_string("red.config.timeline.id", "main");
70 let snapshot_key = crate::storage::wal::archive_snapshot(
71 backend.as_ref(),
72 path,
73 snapshot.snapshot_id,
74 &snapshot_prefix,
75 )
76 .map_err(|err| RedDBError::Internal(err.to_string()))?;
77 let current_lsn = self
78 .inner
79 .db
80 .replication
81 .as_ref()
82 .map(|repl| {
83 repl.logical_wal_spool
84 .as_ref()
85 .map(|spool| spool.current_lsn())
86 .unwrap_or_else(|| repl.wal_buffer.current_lsn())
87 })
88 .unwrap_or_else(|| self.inner.cdc.current_lsn());
89 let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
90 // Hash the local snapshot bytes so the manifest can carry
91 // the digest for restore-side verification (PLAN.md
92 // Phase 4). Failure to hash is non-fatal — we still
93 // publish the manifest, just without a checksum, so a
94 // future fix can backfill rather than losing the backup.
95 let snapshot_sha256 = reddb_file::SnapshotManifest::compute_snapshot_sha256(path)
96 .map_err(|err| {
97 tracing::warn!(
98 target: "reddb::backup",
99 error = %err,
100 snapshot_id = snapshot.snapshot_id,
101 "snapshot hash failed; manifest will lack checksum"
102 );
103 })
104 .ok();
105 let manifest = reddb_file::SnapshotManifest {
106 timeline_id: timeline_id.clone(),
107 snapshot_key: snapshot_key.clone(),
108 snapshot_id: snapshot.snapshot_id,
109 snapshot_time: snapshot.created_at_unix_ms as u64,
110 base_lsn: current_lsn,
111 schema_version: crate::api::REDDB_FORMAT_VERSION,
112 format_version: crate::api::REDDB_FORMAT_VERSION,
113 snapshot_sha256,
114 };
115 crate::storage::wal::publish_snapshot_manifest(backend.as_ref(), &manifest)
116 .map_err(|err| RedDBError::Internal(err.to_string()))?;
117
118 // PLAN.md Phase 11.3 — read the head of the WAL hash chain
119 // so the new segment can link back. `None` means we're
120 // starting a fresh timeline (after a clean restore or on
121 // first archive ever); the segment's `prev_hash` will be
122 // `None` and restore-side validation accepts that only for
123 // the first segment in `plan.wal_segments`.
124 let prev_segment_hash =
125 self.config_string("red.config.timeline.last_segment_hash", "");
126 let prev_hash_arg = if prev_segment_hash.is_empty() {
127 None
128 } else {
129 Some(prev_segment_hash)
130 };
131
132 let archived_lsn = if let Some(primary) = &self.inner.db.replication {
133 let oldest = primary
134 .logical_wal_spool
135 .as_ref()
136 .and_then(|spool| spool.oldest_lsn().ok().flatten())
137 .or_else(|| primary.wal_buffer.oldest_lsn())
138 .unwrap_or(last_archived_lsn);
139 if last_archived_lsn > 0 && last_archived_lsn < oldest.saturating_sub(1) {
140 return Err(RedDBError::Internal(format!(
141 "logical WAL gap detected: last_archived_lsn={last_archived_lsn}, oldest_available_lsn={oldest}"
142 )));
143 }
144 let records = if let Some(spool) = &primary.logical_wal_spool {
145 spool
146 .read_since(last_archived_lsn, usize::MAX)
147 .map_err(|err| RedDBError::Internal(err.to_string()))?
148 } else {
149 primary.wal_buffer.read_since(last_archived_lsn, usize::MAX)
150 };
151 if let Some(meta) = crate::storage::wal::archive_change_records(
152 backend.as_ref(),
153 &wal_prefix,
154 &records,
155 prev_hash_arg,
156 )
157 .map_err(|err| RedDBError::Internal(err.to_string()))?
158 {
159 let _ = primary.prune_retained_wal_through(meta.lsn_end);
160 if let Err(err) = self.prune_primary_replica_wal_segments() {
161 tracing::warn!(
162 error = %err,
163 "failed to prune primary-replica WAL segments"
164 );
165 }
166 // Advance the chain head so the next archive call
167 // links to this segment's hash. If the segment has
168 // no sha256 (legacy / hashing failed) we leave the
169 // head as-is — the next segment then carries the
170 // prior chain head, preserving continuity.
171 if let Some(sha) = &meta.sha256 {
172 self.inner.db.store().set_config_tree(
173 "red.config.timeline",
174 &crate::json!({ "last_segment_hash": sha }),
175 );
176 }
177 meta.lsn_end
178 } else {
179 last_archived_lsn
180 }
181 } else {
182 last_archived_lsn
183 };
184
185 let head = reddb_file::BackupHead {
186 timeline_id,
187 snapshot_key,
188 snapshot_id: snapshot.snapshot_id,
189 snapshot_time: snapshot.created_at_unix_ms as u64,
190 current_lsn,
191 last_archived_lsn: archived_lsn,
192 wal_prefix,
193 };
194 crate::storage::wal::publish_backup_head(backend.as_ref(), &head_key, &head)
195 .map_err(|err| RedDBError::Internal(err.to_string()))?;
196 self.inner.db.store().set_config_tree(
197 "red.config.timeline",
198 &crate::json!({
199 "last_archived_lsn": archived_lsn,
200 "id": head.timeline_id
201 }),
202 );
203
204 // PLAN.md Phase 2.4 — refresh the unified `MANIFEST.json`
205 // at the prefix root so external tooling sees a single
206 // catalog of every snapshot + WAL segment with their
207 // checksums. Best-effort: a manifest publish failure
208 // doesn't fail the backup (the per-artifact sidecars
209 // already give restore-side integrity), but it does log
210 // so dashboards can flag stale catalogs.
211 if let Err(err) = crate::storage::wal::publish_unified_manifest_for_prefix(
212 backend.as_ref(),
213 &snapshot_prefix,
214 ) {
215 tracing::warn!(
216 target: "reddb::backup",
217 error = %err,
218 snapshot_prefix = %snapshot_prefix,
219 "unified MANIFEST.json refresh failed; per-artifact sidecars unaffected"
220 );
221 }
222
223 // PLAN.md Phase 11.4 — when the operator picked a
224 // commit policy that demands replica durability, block
225 // until the configured count of replicas has acked the
226 // archived LSN (or the timeout fires). For backup the
227 // policy decides the *DR posture* — `local` returns
228 // immediately, `ack_n` ensures at least N replicas saw
229 // the new tail before we report success to the
230 // operator. A `TimedOut` is logged but does NOT fail
231 // the backup: the local WAL + remote upload are durable
232 // regardless; the missing acks are reported via
233 // /metrics and /admin/status so the operator can decide.
234 match self.commit_policy() {
235 crate::replication::CommitPolicy::AckN(n) if n > 0 => {
236 let timeout = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
237 .ok()
238 .and_then(|v| v.parse::<u64>().ok())
239 .unwrap_or(5_000);
240 let outcome = self.await_replica_acks(
241 archived_lsn,
242 n,
243 std::time::Duration::from_millis(timeout),
244 );
245 match outcome {
246 crate::replication::AwaitOutcome::Reached(count) => {
247 tracing::debug!(
248 target: "reddb::backup",
249 archived_lsn,
250 n,
251 count,
252 "ack_n: replicas synced before backup return"
253 );
254 }
255 crate::replication::AwaitOutcome::TimedOut { observed, required } => {
256 tracing::warn!(
257 target: "reddb::backup",
258 archived_lsn,
259 observed,
260 required,
261 timeout_ms = timeout,
262 "ack_n: timed out waiting for replicas; backup uploaded but DR posture degraded"
263 );
264 }
265 crate::replication::AwaitOutcome::NotRequired => {}
266 }
267 }
268 _ => {} // Local / RemoteWal / Quorum: no blocking yet
269 }
270
271 // Issue #148 follow-up — opt-in archive of the L2 Blob Cache
272 // directory tree. Default off so a standard backup stays
273 // small; flip via `red.config.backup.include_blob_cache=true`
274 // when warm-cache restore is required (per
275 // docs/operations/blob-cache-backup-restore.md §1).
276 //
277 // The L2 tree is *derived* state (ADR 0006) — its absence
278 // never causes data loss; it only affects post-restore
279 // p99 latency until the cache re-warms. We therefore log
280 // (not fail) on per-file upload errors so a partial L2
281 // upload never aborts a healthy snapshot+WAL backup.
282 if self.config_bool("red.config.backup.include_blob_cache", false) {
283 let blob_cache_prefix = self.config_string(
284 "red.config.backup.blob_cache_prefix",
285 &format!("{snapshot_prefix}blob_cache/"),
286 );
287 if let Some(l2_path) = self.inner.result_blob_cache.l2_path() {
288 match crate::storage::cache::archive_blob_cache_l2(
289 backend.as_ref(),
290 l2_path,
291 &blob_cache_prefix,
292 ) {
293 Ok(count) => {
294 tracing::info!(
295 target: "reddb::backup",
296 files_uploaded = count,
297 blob_cache_prefix = %blob_cache_prefix,
298 "include_blob_cache: archived L2 directory"
299 );
300 }
301 Err(err) => {
302 tracing::warn!(
303 target: "reddb::backup",
304 error = %err,
305 blob_cache_prefix = %blob_cache_prefix,
306 "include_blob_cache: L2 archive failed; backup proceeding (cache is derived state)"
307 );
308 }
309 }
310 } else {
311 tracing::debug!(
312 target: "reddb::backup",
313 "include_blob_cache=true but no L2 path configured; nothing to archive"
314 );
315 }
316 }
317
318 uploaded = true;
319 }
320
321 Ok(crate::replication::scheduler::BackupResult {
322 snapshot_id: snapshot.snapshot_id,
323 uploaded,
324 duration_ms: started.elapsed().as_millis() as u64,
325 timestamp: snapshot.created_at_unix_ms as u64,
326 })
327 })();
328
329 use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
330 let (current_lsn, last_archived_lsn) = self.wal_archive_progress();
331 let mut fields = vec![
332 (
333 "current_lsn".to_string(),
334 Sensitivity::raw(current_lsn.to_string()),
335 ),
336 (
337 "last_archived_lsn".to_string(),
338 Sensitivity::raw(last_archived_lsn.to_string()),
339 ),
340 ];
341 if let Ok(backup) = &result {
342 fields.push((
343 "snapshot_id".to_string(),
344 Sensitivity::raw(backup.snapshot_id.to_string()),
345 ));
346 fields.push((
347 "uploaded".to_string(),
348 Sensitivity::raw(backup.uploaded.to_string()),
349 ));
350 fields.push((
351 "duration_ms".to_string(),
352 Sensitivity::raw(backup.duration_ms.to_string()),
353 ));
354 fields.push((
355 "snapshot_time".to_string(),
356 Sensitivity::raw(backup.timestamp.to_string()),
357 ));
358 }
359 let outcome = match &result {
360 Ok(_) => Outcome::Allowed,
361 Err(err) => crate::runtime::impl_core::control_event_outcome_for_error(err),
362 };
363 let reason = result.as_ref().err().map(|err| err.to_string());
364 self.emit_control_event(
365 EventKind::BackupRun,
366 outcome,
367 "backup_trigger",
368 Some("backup:trigger".to_string()),
369 reason,
370 fields,
371 )?;
372 result
373 }
374}