Skip to main content

reddb_server/runtime/
impl_backup.rs

1use super::*;
2
3impl RedDBRuntime {
4    /// Get backup scheduler status.
5    pub fn backup_status(&self) -> crate::replication::scheduler::BackupStatus {
6        self.inner.backup_scheduler.status()
7    }
8
9    /// Borrow the runtime's result Blob Cache.
10    ///
11    /// Wired for the `/admin/blob_cache/sweep` and
12    /// `/admin/blob_cache/flush_namespace` HTTP handlers (issue #148
13    /// follow-up): both delegate to
14    /// `crate::storage::cache::sweeper::BlobCacheSweeper`, which takes a
15    /// `&BlobCache`. Also used by `trigger_backup` when
16    /// `red.config.backup.include_blob_cache=true` to locate the L2
17    /// directory for archival.
18    pub fn result_blob_cache(&self) -> &crate::storage::cache::BlobCache {
19        &self.inner.result_blob_cache
20    }
21
22    /// Current local LSN paired with the LSN of the most recently
23    /// archived WAL segment. The difference is the replication /
24    /// archive lag operators alert on (PLAN.md Phase 5.1). Returns
25    /// `(0, 0)` when neither replication nor archiving is configured.
26    pub fn wal_archive_progress(&self) -> (u64, u64) {
27        let current_lsn = self
28            .inner
29            .db
30            .replication
31            .as_ref()
32            .map(|repl| {
33                repl.logical_wal_spool
34                    .as_ref()
35                    .map(|spool| spool.current_lsn())
36                    .unwrap_or_else(|| repl.wal_buffer.current_lsn())
37            })
38            .unwrap_or_else(|| self.inner.cdc.current_lsn());
39        let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
40        (current_lsn, last_archived_lsn)
41    }
42
43    /// Trigger an immediate backup.
44    pub fn trigger_backup(&self) -> RedDBResult<crate::replication::scheduler::BackupResult> {
45        let result = (|| {
46            self.check_write(crate::runtime::write_gate::WriteKind::Backup)?;
47            // Defense in depth — check_write above already rejects when
48            // the lease is NotHeld, but log + audit the lease angle here
49            // explicitly so dashboards distinguish "lease lost" from a
50            // generic read-only refusal.
51            self.assert_remote_write_allowed("admin/backup")?;
52            let started = std::time::Instant::now();
53            let snapshot = self.create_snapshot()?;
54            let mut uploaded = false;
55
56            if let (Some(backend), Some(path)) =
57                (&self.inner.db.remote_backend, self.inner.db.path())
58            {
59                let default_snapshot_prefix = self.inner.db.options().default_snapshot_prefix();
60                let default_wal_prefix = self.inner.db.options().default_wal_archive_prefix();
61                let default_head_key = self.inner.db.options().default_backup_head_key();
62                let snapshot_prefix = self.config_string(
63                    "red.config.backup.snapshot_prefix",
64                    &default_snapshot_prefix,
65                );
66                let wal_prefix =
67                    self.config_string("red.config.wal.archive.prefix", &default_wal_prefix);
68                let head_key = self.config_string("red.config.backup.head_key", &default_head_key);
69                let timeline_id = self.config_string("red.config.timeline.id", "main");
70                let snapshot_key = crate::storage::wal::archive_snapshot(
71                    backend.as_ref(),
72                    path,
73                    snapshot.snapshot_id,
74                    &snapshot_prefix,
75                )
76                .map_err(|err| RedDBError::Internal(err.to_string()))?;
77                let current_lsn = self
78                    .inner
79                    .db
80                    .replication
81                    .as_ref()
82                    .map(|repl| {
83                        repl.logical_wal_spool
84                            .as_ref()
85                            .map(|spool| spool.current_lsn())
86                            .unwrap_or_else(|| repl.wal_buffer.current_lsn())
87                    })
88                    .unwrap_or_else(|| self.inner.cdc.current_lsn());
89                let last_archived_lsn = self.config_u64("red.config.timeline.last_archived_lsn", 0);
90                // Hash the local snapshot bytes so the manifest can carry
91                // the digest for restore-side verification (PLAN.md
92                // Phase 4). Failure to hash is non-fatal — we still
93                // publish the manifest, just without a checksum, so a
94                // future fix can backfill rather than losing the backup.
95                let snapshot_sha256 = reddb_file::SnapshotManifest::compute_snapshot_sha256(path)
96                    .map_err(|err| {
97                        tracing::warn!(
98                            target: "reddb::backup",
99                            error = %err,
100                            snapshot_id = snapshot.snapshot_id,
101                            "snapshot hash failed; manifest will lack checksum"
102                        );
103                    })
104                    .ok();
105                let manifest = reddb_file::SnapshotManifest {
106                    timeline_id: timeline_id.clone(),
107                    snapshot_key: snapshot_key.clone(),
108                    snapshot_id: snapshot.snapshot_id,
109                    snapshot_time: snapshot.created_at_unix_ms as u64,
110                    base_lsn: current_lsn,
111                    schema_version: crate::api::REDDB_FORMAT_VERSION,
112                    format_version: crate::api::REDDB_FORMAT_VERSION,
113                    snapshot_sha256,
114                };
115                crate::storage::wal::publish_snapshot_manifest(backend.as_ref(), &manifest)
116                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
117
118                // PLAN.md Phase 11.3 — read the head of the WAL hash chain
119                // so the new segment can link back. `None` means we're
120                // starting a fresh timeline (after a clean restore or on
121                // first archive ever); the segment's `prev_hash` will be
122                // `None` and restore-side validation accepts that only for
123                // the first segment in `plan.wal_segments`.
124                let prev_segment_hash =
125                    self.config_string("red.config.timeline.last_segment_hash", "");
126                let prev_hash_arg = if prev_segment_hash.is_empty() {
127                    None
128                } else {
129                    Some(prev_segment_hash)
130                };
131
132                let archived_lsn = if let Some(primary) = &self.inner.db.replication {
133                    let oldest = primary
134                        .logical_wal_spool
135                        .as_ref()
136                        .and_then(|spool| spool.oldest_lsn().ok().flatten())
137                        .or_else(|| primary.wal_buffer.oldest_lsn())
138                        .unwrap_or(last_archived_lsn);
139                    if last_archived_lsn > 0 && last_archived_lsn < oldest.saturating_sub(1) {
140                        return Err(RedDBError::Internal(format!(
141                            "logical WAL gap detected: last_archived_lsn={last_archived_lsn}, oldest_available_lsn={oldest}"
142                        )));
143                    }
144                    let records = if let Some(spool) = &primary.logical_wal_spool {
145                        spool
146                            .read_since(last_archived_lsn, usize::MAX)
147                            .map_err(|err| RedDBError::Internal(err.to_string()))?
148                    } else {
149                        primary.wal_buffer.read_since(last_archived_lsn, usize::MAX)
150                    };
151                    if let Some(meta) = crate::storage::wal::archive_change_records(
152                        backend.as_ref(),
153                        &wal_prefix,
154                        &records,
155                        prev_hash_arg,
156                    )
157                    .map_err(|err| RedDBError::Internal(err.to_string()))?
158                    {
159                        let _ = primary.prune_retained_wal_through(meta.lsn_end);
160                        if let Err(err) = self.prune_primary_replica_wal_segments() {
161                            tracing::warn!(
162                                error = %err,
163                                "failed to prune primary-replica WAL segments"
164                            );
165                        }
166                        // Advance the chain head so the next archive call
167                        // links to this segment's hash. If the segment has
168                        // no sha256 (legacy / hashing failed) we leave the
169                        // head as-is — the next segment then carries the
170                        // prior chain head, preserving continuity.
171                        if let Some(sha) = &meta.sha256 {
172                            self.inner.db.store().set_config_tree(
173                                "red.config.timeline",
174                                &crate::json!({ "last_segment_hash": sha }),
175                            );
176                        }
177                        meta.lsn_end
178                    } else {
179                        last_archived_lsn
180                    }
181                } else {
182                    last_archived_lsn
183                };
184
185                let head = reddb_file::BackupHead {
186                    timeline_id,
187                    snapshot_key,
188                    snapshot_id: snapshot.snapshot_id,
189                    snapshot_time: snapshot.created_at_unix_ms as u64,
190                    current_lsn,
191                    last_archived_lsn: archived_lsn,
192                    wal_prefix,
193                };
194                crate::storage::wal::publish_backup_head(backend.as_ref(), &head_key, &head)
195                    .map_err(|err| RedDBError::Internal(err.to_string()))?;
196                self.inner.db.store().set_config_tree(
197                    "red.config.timeline",
198                    &crate::json!({
199                        "last_archived_lsn": archived_lsn,
200                        "id": head.timeline_id
201                    }),
202                );
203
204                // PLAN.md Phase 2.4 — refresh the unified `MANIFEST.json`
205                // at the prefix root so external tooling sees a single
206                // catalog of every snapshot + WAL segment with their
207                // checksums. Best-effort: a manifest publish failure
208                // doesn't fail the backup (the per-artifact sidecars
209                // already give restore-side integrity), but it does log
210                // so dashboards can flag stale catalogs.
211                if let Err(err) = crate::storage::wal::publish_unified_manifest_for_prefix(
212                    backend.as_ref(),
213                    &snapshot_prefix,
214                ) {
215                    tracing::warn!(
216                        target: "reddb::backup",
217                        error = %err,
218                        snapshot_prefix = %snapshot_prefix,
219                        "unified MANIFEST.json refresh failed; per-artifact sidecars unaffected"
220                    );
221                }
222
223                // PLAN.md Phase 11.4 — when the operator picked a
224                // commit policy that demands replica durability, block
225                // until the configured count of replicas has acked the
226                // archived LSN (or the timeout fires). For backup the
227                // policy decides the *DR posture* — `local` returns
228                // immediately, `ack_n` ensures at least N replicas saw
229                // the new tail before we report success to the
230                // operator. A `TimedOut` is logged but does NOT fail
231                // the backup: the local WAL + remote upload are durable
232                // regardless; the missing acks are reported via
233                // /metrics and /admin/status so the operator can decide.
234                match self.commit_policy() {
235                    crate::replication::CommitPolicy::AckN(n) if n > 0 => {
236                        let timeout = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
237                            .ok()
238                            .and_then(|v| v.parse::<u64>().ok())
239                            .unwrap_or(5_000);
240                        let outcome = self.await_replica_acks(
241                            archived_lsn,
242                            n,
243                            std::time::Duration::from_millis(timeout),
244                        );
245                        match outcome {
246                            crate::replication::AwaitOutcome::Reached(count) => {
247                                tracing::debug!(
248                                    target: "reddb::backup",
249                                    archived_lsn,
250                                    n,
251                                    count,
252                                    "ack_n: replicas synced before backup return"
253                                );
254                            }
255                            crate::replication::AwaitOutcome::TimedOut { observed, required } => {
256                                tracing::warn!(
257                                    target: "reddb::backup",
258                                    archived_lsn,
259                                    observed,
260                                    required,
261                                    timeout_ms = timeout,
262                                    "ack_n: timed out waiting for replicas; backup uploaded but DR posture degraded"
263                                );
264                            }
265                            crate::replication::AwaitOutcome::NotRequired => {}
266                        }
267                    }
268                    _ => {} // Local / RemoteWal / Quorum: no blocking yet
269                }
270
271                // Issue #148 follow-up — opt-in archive of the L2 Blob Cache
272                // directory tree. Default off so a standard backup stays
273                // small; flip via `red.config.backup.include_blob_cache=true`
274                // when warm-cache restore is required (per
275                // docs/operations/blob-cache-backup-restore.md §1).
276                //
277                // The L2 tree is *derived* state (ADR 0006) — its absence
278                // never causes data loss; it only affects post-restore
279                // p99 latency until the cache re-warms. We therefore log
280                // (not fail) on per-file upload errors so a partial L2
281                // upload never aborts a healthy snapshot+WAL backup.
282                if self.config_bool("red.config.backup.include_blob_cache", false) {
283                    let blob_cache_prefix = self.config_string(
284                        "red.config.backup.blob_cache_prefix",
285                        &format!("{snapshot_prefix}blob_cache/"),
286                    );
287                    if let Some(l2_path) = self.inner.result_blob_cache.l2_path() {
288                        match crate::storage::cache::archive_blob_cache_l2(
289                            backend.as_ref(),
290                            l2_path,
291                            &blob_cache_prefix,
292                        ) {
293                            Ok(count) => {
294                                tracing::info!(
295                                    target: "reddb::backup",
296                                    files_uploaded = count,
297                                    blob_cache_prefix = %blob_cache_prefix,
298                                    "include_blob_cache: archived L2 directory"
299                                );
300                            }
301                            Err(err) => {
302                                tracing::warn!(
303                                    target: "reddb::backup",
304                                    error = %err,
305                                    blob_cache_prefix = %blob_cache_prefix,
306                                    "include_blob_cache: L2 archive failed; backup proceeding (cache is derived state)"
307                                );
308                            }
309                        }
310                    } else {
311                        tracing::debug!(
312                            target: "reddb::backup",
313                            "include_blob_cache=true but no L2 path configured; nothing to archive"
314                        );
315                    }
316                }
317
318                uploaded = true;
319            }
320
321            Ok(crate::replication::scheduler::BackupResult {
322                snapshot_id: snapshot.snapshot_id,
323                uploaded,
324                duration_ms: started.elapsed().as_millis() as u64,
325                timestamp: snapshot.created_at_unix_ms as u64,
326            })
327        })();
328
329        use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
330        let (current_lsn, last_archived_lsn) = self.wal_archive_progress();
331        let mut fields = vec![
332            (
333                "current_lsn".to_string(),
334                Sensitivity::raw(current_lsn.to_string()),
335            ),
336            (
337                "last_archived_lsn".to_string(),
338                Sensitivity::raw(last_archived_lsn.to_string()),
339            ),
340        ];
341        if let Ok(backup) = &result {
342            fields.push((
343                "snapshot_id".to_string(),
344                Sensitivity::raw(backup.snapshot_id.to_string()),
345            ));
346            fields.push((
347                "uploaded".to_string(),
348                Sensitivity::raw(backup.uploaded.to_string()),
349            ));
350            fields.push((
351                "duration_ms".to_string(),
352                Sensitivity::raw(backup.duration_ms.to_string()),
353            ));
354            fields.push((
355                "snapshot_time".to_string(),
356                Sensitivity::raw(backup.timestamp.to_string()),
357            ));
358        }
359        let outcome = match &result {
360            Ok(_) => Outcome::Allowed,
361            Err(err) => crate::runtime::impl_core::control_event_outcome_for_error(err),
362        };
363        let reason = result.as_ref().err().map(|err| err.to_string());
364        self.emit_control_event(
365            EventKind::BackupRun,
366            outcome,
367            "backup_trigger",
368            Some("backup:trigger".to_string()),
369            reason,
370            fields,
371        )?;
372        result
373    }
374}