Skip to main content

reddb_server/
backup_bootstrap.rs

1//! Env-driven `BackupBootstrap` (issue #517).
2//!
3//! Parses the canonical `REDDB_BACKUP_*` env contract into a
4//! [`BackupConfig`]. Pure function — env access is injected as a
5//! closure so unit tests need no real process env. The `red` binary
6//! calls [`from_env`] at boot; the returned `Option<BackupConfig>`
7//! drives `Options::with_remote_backend` + `with_atomic_remote_backend`
8//! wiring and the archiver / checkpointer task intervals.
9//!
10//! Contract:
11//!   * `REDDB_BACKUP_S3_ENDPOINT`   (required)
12//!   * `REDDB_BACKUP_S3_BUCKET`     (required)
13//!   * `REDDB_BACKUP_S3_PREFIX`     (required)
14//!   * `REDDB_BACKUP_S3_ACCESS_KEY_ID`     (required)
15//!   * `REDDB_BACKUP_S3_SECRET_ACCESS_KEY` (required)
16//!   * `REDDB_BACKUP_S3_REGION`     (default `auto`)
17//!   * `REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS` (default 3600, must be > 0)
18//!   * `REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS`  (default 30,   must be > 0)
19//!   * `REDDB_BACKUP_PAUSE_ON_LAG_SECS`        (default 0 = disabled; > 0 enables
20//!     graceful read-only mode when WAL archive lag exceeds the threshold —
21//!     issue #519)
22//!
23//! Resolution:
24//!   * All required vars absent → `Ok(None)` (standalone; identical to
25//!     today's behaviour).
26//!   * All required vars present → `Ok(Some(BackupConfig))`.
27//!   * Partial config (at least one required present, at least one
28//!     missing) → `Err` naming the missing var.
29//!   * Non-numeric / zero interval → `Err`.
30
31/// Parsed configuration produced by [`from_env`]. Carries everything
32/// the `red` binary needs to construct an `S3Backend` and the two
33/// background tasks (archiver + checkpointer).
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct BackupConfig {
36    pub endpoint: String,
37    pub bucket: String,
38    pub region: String,
39    pub access_key_id: String,
40    pub secret_access_key: String,
41    pub prefix: String,
42    pub checkpoint_interval_secs: u64,
43    pub wal_flush_interval_secs: u64,
44    /// Issue #519 — when > 0, the engine monitors archive lag (`now -
45    /// last_successful_archive_at`) and transitions to a graceful
46    /// read-only mode when the lag exceeds this threshold. `0` keeps
47    /// the legacy behaviour (writes always accepted while local volume
48    /// has room, regardless of remote backend health).
49    pub pause_on_lag_secs: u64,
50}
51
52const REQUIRED_VARS: &[&str] = &[
53    "REDDB_BACKUP_S3_ENDPOINT",
54    "REDDB_BACKUP_S3_BUCKET",
55    "REDDB_BACKUP_S3_PREFIX",
56    "REDDB_BACKUP_S3_ACCESS_KEY_ID",
57    "REDDB_BACKUP_S3_SECRET_ACCESS_KEY",
58];
59
60const REGION_VAR: &str = "REDDB_BACKUP_S3_REGION";
61const CHECKPOINT_VAR: &str = "REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS";
62const WAL_FLUSH_VAR: &str = "REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS";
63const PAUSE_ON_LAG_VAR: &str = "REDDB_BACKUP_PAUSE_ON_LAG_SECS";
64
65const DEFAULT_REGION: &str = "auto";
66const DEFAULT_CHECKPOINT_SECS: u64 = 3600;
67const DEFAULT_WAL_FLUSH_SECS: u64 = 30;
68const DEFAULT_PAUSE_ON_LAG_SECS: u64 = 0;
69
70/// Parse the `REDDB_BACKUP_*` env contract using the supplied
71/// env-var lookup. See module docs for the contract.
72pub fn from_env<F>(env: F) -> Result<Option<BackupConfig>, String>
73where
74    F: Fn(&str) -> Option<String>,
75{
76    let presence: Vec<(&str, Option<String>)> = REQUIRED_VARS
77        .iter()
78        .map(|name| (*name, env(name).filter(|v| !v.trim().is_empty())))
79        .collect();
80
81    let present_count = presence.iter().filter(|(_, v)| v.is_some()).count();
82
83    if present_count == 0 {
84        return Ok(None);
85    }
86
87    if present_count < REQUIRED_VARS.len() {
88        let missing: Vec<&str> = presence
89            .iter()
90            .filter_map(|(n, v)| v.is_none().then_some(*n))
91            .collect();
92        return Err(format!(
93            "partial REDDB_BACKUP_S3_* config; missing: {}",
94            missing.join(", ")
95        ));
96    }
97
98    let mut required = presence.into_iter().map(|(_, v)| v.unwrap());
99    let endpoint = required.next().unwrap();
100    let bucket = required.next().unwrap();
101    let prefix = required.next().unwrap();
102    let access_key_id = required.next().unwrap();
103    let secret_access_key = required.next().unwrap();
104
105    let region = env(REGION_VAR)
106        .filter(|v| !v.trim().is_empty())
107        .unwrap_or_else(|| DEFAULT_REGION.to_string());
108
109    let checkpoint_interval_secs = parse_interval(&env, CHECKPOINT_VAR, DEFAULT_CHECKPOINT_SECS)?;
110    let wal_flush_interval_secs = parse_interval(&env, WAL_FLUSH_VAR, DEFAULT_WAL_FLUSH_SECS)?;
111    let pause_on_lag_secs = parse_pause_on_lag(&env, DEFAULT_PAUSE_ON_LAG_SECS)?;
112
113    Ok(Some(BackupConfig {
114        endpoint,
115        bucket,
116        region,
117        access_key_id,
118        secret_access_key,
119        prefix,
120        checkpoint_interval_secs,
121        wal_flush_interval_secs,
122        pause_on_lag_secs,
123    }))
124}
125
126fn parse_pause_on_lag<F>(env: &F, default: u64) -> Result<u64, String>
127where
128    F: Fn(&str) -> Option<String>,
129{
130    let Some(raw) = env(PAUSE_ON_LAG_VAR).filter(|v| !v.trim().is_empty()) else {
131        return Ok(default);
132    };
133    let trimmed = raw.trim();
134    let parsed: i128 = trimmed
135        .parse()
136        .map_err(|_| format!("{PAUSE_ON_LAG_VAR} must be a non-negative integer; got {raw:?}"))?;
137    if parsed < 0 {
138        return Err(format!(
139            "{PAUSE_ON_LAG_VAR} must be >= 0; got {parsed} (negative not allowed)"
140        ));
141    }
142    let as_u64 = u64::try_from(parsed)
143        .map_err(|_| format!("{PAUSE_ON_LAG_VAR} exceeds u64 range; got {parsed}"))?;
144    Ok(as_u64)
145}
146
147fn parse_interval<F>(env: &F, name: &str, default: u64) -> Result<u64, String>
148where
149    F: Fn(&str) -> Option<String>,
150{
151    let Some(raw) = env(name).filter(|v| !v.trim().is_empty()) else {
152        return Ok(default);
153    };
154    let trimmed = raw.trim();
155    let parsed: i128 = trimmed
156        .parse()
157        .map_err(|_| format!("{name} must be a positive integer; got {raw:?}"))?;
158    if parsed <= 0 {
159        return Err(format!(
160            "{name} must be > 0; got {parsed} (zero/negative not allowed)"
161        ));
162    }
163    let as_u64 =
164        u64::try_from(parsed).map_err(|_| format!("{name} exceeds u64 range; got {parsed}"))?;
165    Ok(as_u64)
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use std::collections::HashMap;
172
173    fn lookup<'a>(
174        map: &'a HashMap<&'static str, &'static str>,
175    ) -> impl Fn(&str) -> Option<String> + 'a {
176        move |k| map.get(k).map(|s| s.to_string())
177    }
178
179    #[test]
180    fn none_present_yields_none() {
181        let map: HashMap<&'static str, &'static str> = HashMap::new();
182        let got = from_env(lookup(&map)).unwrap();
183        assert!(got.is_none());
184    }
185
186    #[test]
187    fn all_required_present_yields_config_with_defaults() {
188        let map: HashMap<&'static str, &'static str> = [
189            ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
190            ("REDDB_BACKUP_S3_BUCKET", "buck"),
191            ("REDDB_BACKUP_S3_PREFIX", "clusters/dev/"),
192            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
193            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
194        ]
195        .into_iter()
196        .collect();
197        let cfg = from_env(lookup(&map)).unwrap().expect("Some");
198        assert_eq!(cfg.endpoint, "https://s3.example.com");
199        assert_eq!(cfg.bucket, "buck");
200        assert_eq!(cfg.prefix, "clusters/dev/");
201        assert_eq!(cfg.access_key_id, "AK");
202        assert_eq!(cfg.secret_access_key, "SK");
203        assert_eq!(cfg.region, DEFAULT_REGION);
204        assert_eq!(cfg.checkpoint_interval_secs, DEFAULT_CHECKPOINT_SECS);
205        assert_eq!(cfg.wal_flush_interval_secs, DEFAULT_WAL_FLUSH_SECS);
206        assert_eq!(cfg.pause_on_lag_secs, DEFAULT_PAUSE_ON_LAG_SECS);
207    }
208
209    #[test]
210    fn pause_on_lag_is_parsed_when_present() {
211        let map: HashMap<&'static str, &'static str> = [
212            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
213            ("REDDB_BACKUP_S3_BUCKET", "b"),
214            ("REDDB_BACKUP_S3_PREFIX", "p/"),
215            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
216            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
217            ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "300"),
218        ]
219        .into_iter()
220        .collect();
221        let cfg = from_env(lookup(&map)).unwrap().expect("Some");
222        assert_eq!(cfg.pause_on_lag_secs, 300);
223    }
224
225    #[test]
226    fn pause_on_lag_zero_is_disabled() {
227        let map: HashMap<&'static str, &'static str> = [
228            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
229            ("REDDB_BACKUP_S3_BUCKET", "b"),
230            ("REDDB_BACKUP_S3_PREFIX", "p/"),
231            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
232            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
233            ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "0"),
234        ]
235        .into_iter()
236        .collect();
237        let cfg = from_env(lookup(&map)).unwrap().expect("Some");
238        assert_eq!(cfg.pause_on_lag_secs, 0);
239    }
240
241    #[test]
242    fn pause_on_lag_negative_is_error() {
243        let map: HashMap<&'static str, &'static str> = [
244            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
245            ("REDDB_BACKUP_S3_BUCKET", "b"),
246            ("REDDB_BACKUP_S3_PREFIX", "p/"),
247            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
248            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
249            ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "-1"),
250        ]
251        .into_iter()
252        .collect();
253        let err = from_env(lookup(&map)).unwrap_err();
254        assert!(err.contains("REDDB_BACKUP_PAUSE_ON_LAG_SECS"), "{err}");
255    }
256
257    #[test]
258    fn pause_on_lag_non_numeric_is_error() {
259        let map: HashMap<&'static str, &'static str> = [
260            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
261            ("REDDB_BACKUP_S3_BUCKET", "b"),
262            ("REDDB_BACKUP_S3_PREFIX", "p/"),
263            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
264            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
265            ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "soon"),
266        ]
267        .into_iter()
268        .collect();
269        let err = from_env(lookup(&map)).unwrap_err();
270        assert!(err.contains("REDDB_BACKUP_PAUSE_ON_LAG_SECS"), "{err}");
271        assert!(err.contains("non-negative"), "{err}");
272    }
273
274    #[test]
275    fn all_required_present_with_explicit_overrides() {
276        let map: HashMap<&'static str, &'static str> = [
277            ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
278            ("REDDB_BACKUP_S3_BUCKET", "b"),
279            ("REDDB_BACKUP_S3_PREFIX", "p/"),
280            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
281            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
282            ("REDDB_BACKUP_S3_REGION", "us-east-1"),
283            ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "60"),
284            ("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS", "5"),
285        ]
286        .into_iter()
287        .collect();
288        let cfg = from_env(lookup(&map)).unwrap().expect("Some");
289        assert_eq!(cfg.region, "us-east-1");
290        assert_eq!(cfg.checkpoint_interval_secs, 60);
291        assert_eq!(cfg.wal_flush_interval_secs, 5);
292    }
293
294    #[test]
295    fn partial_config_names_missing_var() {
296        let map: HashMap<&'static str, &'static str> = [
297            ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
298            ("REDDB_BACKUP_S3_BUCKET", "b"),
299        ]
300        .into_iter()
301        .collect();
302        let err = from_env(lookup(&map)).unwrap_err();
303        assert!(err.contains("REDDB_BACKUP_S3_PREFIX"), "{err}");
304        assert!(err.contains("REDDB_BACKUP_S3_ACCESS_KEY_ID"), "{err}");
305        assert!(err.contains("REDDB_BACKUP_S3_SECRET_ACCESS_KEY"), "{err}");
306    }
307
308    #[test]
309    fn whitespace_only_required_treated_as_missing() {
310        let map: HashMap<&'static str, &'static str> = [
311            ("REDDB_BACKUP_S3_ENDPOINT", "   "),
312            ("REDDB_BACKUP_S3_BUCKET", "b"),
313            ("REDDB_BACKUP_S3_PREFIX", "p/"),
314            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
315            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
316        ]
317        .into_iter()
318        .collect();
319        let err = from_env(lookup(&map)).unwrap_err();
320        assert!(err.contains("REDDB_BACKUP_S3_ENDPOINT"), "{err}");
321    }
322
323    #[test]
324    fn non_numeric_interval_is_error() {
325        let map: HashMap<&'static str, &'static str> = [
326            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
327            ("REDDB_BACKUP_S3_BUCKET", "b"),
328            ("REDDB_BACKUP_S3_PREFIX", "p/"),
329            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
330            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
331            ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "abc"),
332        ]
333        .into_iter()
334        .collect();
335        let err = from_env(lookup(&map)).unwrap_err();
336        assert!(
337            err.contains("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS"),
338            "{err}"
339        );
340        assert!(err.contains("positive integer"), "{err}");
341    }
342
343    #[test]
344    fn zero_interval_is_error() {
345        let map: HashMap<&'static str, &'static str> = [
346            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
347            ("REDDB_BACKUP_S3_BUCKET", "b"),
348            ("REDDB_BACKUP_S3_PREFIX", "p/"),
349            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
350            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
351            ("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS", "0"),
352        ]
353        .into_iter()
354        .collect();
355        let err = from_env(lookup(&map)).unwrap_err();
356        assert!(
357            err.contains("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS"),
358            "{err}"
359        );
360        assert!(err.contains("> 0"), "{err}");
361    }
362
363    #[test]
364    fn negative_interval_is_error() {
365        let map: HashMap<&'static str, &'static str> = [
366            ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
367            ("REDDB_BACKUP_S3_BUCKET", "b"),
368            ("REDDB_BACKUP_S3_PREFIX", "p/"),
369            ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
370            ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
371            ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "-10"),
372        ]
373        .into_iter()
374        .collect();
375        let err = from_env(lookup(&map)).unwrap_err();
376        assert!(
377            err.contains("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS"),
378            "{err}"
379        );
380    }
381}