1#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct BackupConfig {
36 pub endpoint: String,
37 pub bucket: String,
38 pub region: String,
39 pub access_key_id: String,
40 pub secret_access_key: String,
41 pub prefix: String,
42 pub checkpoint_interval_secs: u64,
43 pub wal_flush_interval_secs: u64,
44 pub pause_on_lag_secs: u64,
50}
51
52const REQUIRED_VARS: &[&str] = &[
53 "REDDB_BACKUP_S3_ENDPOINT",
54 "REDDB_BACKUP_S3_BUCKET",
55 "REDDB_BACKUP_S3_PREFIX",
56 "REDDB_BACKUP_S3_ACCESS_KEY_ID",
57 "REDDB_BACKUP_S3_SECRET_ACCESS_KEY",
58];
59
60const REGION_VAR: &str = "REDDB_BACKUP_S3_REGION";
61const CHECKPOINT_VAR: &str = "REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS";
62const WAL_FLUSH_VAR: &str = "REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS";
63const PAUSE_ON_LAG_VAR: &str = "REDDB_BACKUP_PAUSE_ON_LAG_SECS";
64
65const DEFAULT_REGION: &str = "auto";
66const DEFAULT_CHECKPOINT_SECS: u64 = 3600;
67const DEFAULT_WAL_FLUSH_SECS: u64 = 30;
68const DEFAULT_PAUSE_ON_LAG_SECS: u64 = 0;
69
70pub fn from_env<F>(env: F) -> Result<Option<BackupConfig>, String>
73where
74 F: Fn(&str) -> Option<String>,
75{
76 let presence: Vec<(&str, Option<String>)> = REQUIRED_VARS
77 .iter()
78 .map(|name| (*name, env(name).filter(|v| !v.trim().is_empty())))
79 .collect();
80
81 let present_count = presence.iter().filter(|(_, v)| v.is_some()).count();
82
83 if present_count == 0 {
84 return Ok(None);
85 }
86
87 if present_count < REQUIRED_VARS.len() {
88 let missing: Vec<&str> = presence
89 .iter()
90 .filter_map(|(n, v)| v.is_none().then_some(*n))
91 .collect();
92 return Err(format!(
93 "partial REDDB_BACKUP_S3_* config; missing: {}",
94 missing.join(", ")
95 ));
96 }
97
98 let mut required = presence.into_iter().map(|(_, v)| v.unwrap());
99 let endpoint = required.next().unwrap();
100 let bucket = required.next().unwrap();
101 let prefix = required.next().unwrap();
102 let access_key_id = required.next().unwrap();
103 let secret_access_key = required.next().unwrap();
104
105 let region = env(REGION_VAR)
106 .filter(|v| !v.trim().is_empty())
107 .unwrap_or_else(|| DEFAULT_REGION.to_string());
108
109 let checkpoint_interval_secs = parse_interval(&env, CHECKPOINT_VAR, DEFAULT_CHECKPOINT_SECS)?;
110 let wal_flush_interval_secs = parse_interval(&env, WAL_FLUSH_VAR, DEFAULT_WAL_FLUSH_SECS)?;
111 let pause_on_lag_secs = parse_pause_on_lag(&env, DEFAULT_PAUSE_ON_LAG_SECS)?;
112
113 Ok(Some(BackupConfig {
114 endpoint,
115 bucket,
116 region,
117 access_key_id,
118 secret_access_key,
119 prefix,
120 checkpoint_interval_secs,
121 wal_flush_interval_secs,
122 pause_on_lag_secs,
123 }))
124}
125
126fn parse_pause_on_lag<F>(env: &F, default: u64) -> Result<u64, String>
127where
128 F: Fn(&str) -> Option<String>,
129{
130 let Some(raw) = env(PAUSE_ON_LAG_VAR).filter(|v| !v.trim().is_empty()) else {
131 return Ok(default);
132 };
133 let trimmed = raw.trim();
134 let parsed: i128 = trimmed
135 .parse()
136 .map_err(|_| format!("{PAUSE_ON_LAG_VAR} must be a non-negative integer; got {raw:?}"))?;
137 if parsed < 0 {
138 return Err(format!(
139 "{PAUSE_ON_LAG_VAR} must be >= 0; got {parsed} (negative not allowed)"
140 ));
141 }
142 let as_u64 = u64::try_from(parsed)
143 .map_err(|_| format!("{PAUSE_ON_LAG_VAR} exceeds u64 range; got {parsed}"))?;
144 Ok(as_u64)
145}
146
147fn parse_interval<F>(env: &F, name: &str, default: u64) -> Result<u64, String>
148where
149 F: Fn(&str) -> Option<String>,
150{
151 let Some(raw) = env(name).filter(|v| !v.trim().is_empty()) else {
152 return Ok(default);
153 };
154 let trimmed = raw.trim();
155 let parsed: i128 = trimmed
156 .parse()
157 .map_err(|_| format!("{name} must be a positive integer; got {raw:?}"))?;
158 if parsed <= 0 {
159 return Err(format!(
160 "{name} must be > 0; got {parsed} (zero/negative not allowed)"
161 ));
162 }
163 let as_u64 =
164 u64::try_from(parsed).map_err(|_| format!("{name} exceeds u64 range; got {parsed}"))?;
165 Ok(as_u64)
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171 use std::collections::HashMap;
172
173 fn lookup<'a>(
174 map: &'a HashMap<&'static str, &'static str>,
175 ) -> impl Fn(&str) -> Option<String> + 'a {
176 move |k| map.get(k).map(|s| s.to_string())
177 }
178
179 #[test]
180 fn none_present_yields_none() {
181 let map: HashMap<&'static str, &'static str> = HashMap::new();
182 let got = from_env(lookup(&map)).unwrap();
183 assert!(got.is_none());
184 }
185
186 #[test]
187 fn all_required_present_yields_config_with_defaults() {
188 let map: HashMap<&'static str, &'static str> = [
189 ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
190 ("REDDB_BACKUP_S3_BUCKET", "buck"),
191 ("REDDB_BACKUP_S3_PREFIX", "clusters/dev/"),
192 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
193 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
194 ]
195 .into_iter()
196 .collect();
197 let cfg = from_env(lookup(&map)).unwrap().expect("Some");
198 assert_eq!(cfg.endpoint, "https://s3.example.com");
199 assert_eq!(cfg.bucket, "buck");
200 assert_eq!(cfg.prefix, "clusters/dev/");
201 assert_eq!(cfg.access_key_id, "AK");
202 assert_eq!(cfg.secret_access_key, "SK");
203 assert_eq!(cfg.region, DEFAULT_REGION);
204 assert_eq!(cfg.checkpoint_interval_secs, DEFAULT_CHECKPOINT_SECS);
205 assert_eq!(cfg.wal_flush_interval_secs, DEFAULT_WAL_FLUSH_SECS);
206 assert_eq!(cfg.pause_on_lag_secs, DEFAULT_PAUSE_ON_LAG_SECS);
207 }
208
209 #[test]
210 fn pause_on_lag_is_parsed_when_present() {
211 let map: HashMap<&'static str, &'static str> = [
212 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
213 ("REDDB_BACKUP_S3_BUCKET", "b"),
214 ("REDDB_BACKUP_S3_PREFIX", "p/"),
215 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
216 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
217 ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "300"),
218 ]
219 .into_iter()
220 .collect();
221 let cfg = from_env(lookup(&map)).unwrap().expect("Some");
222 assert_eq!(cfg.pause_on_lag_secs, 300);
223 }
224
225 #[test]
226 fn pause_on_lag_zero_is_disabled() {
227 let map: HashMap<&'static str, &'static str> = [
228 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
229 ("REDDB_BACKUP_S3_BUCKET", "b"),
230 ("REDDB_BACKUP_S3_PREFIX", "p/"),
231 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
232 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
233 ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "0"),
234 ]
235 .into_iter()
236 .collect();
237 let cfg = from_env(lookup(&map)).unwrap().expect("Some");
238 assert_eq!(cfg.pause_on_lag_secs, 0);
239 }
240
241 #[test]
242 fn pause_on_lag_negative_is_error() {
243 let map: HashMap<&'static str, &'static str> = [
244 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
245 ("REDDB_BACKUP_S3_BUCKET", "b"),
246 ("REDDB_BACKUP_S3_PREFIX", "p/"),
247 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
248 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
249 ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "-1"),
250 ]
251 .into_iter()
252 .collect();
253 let err = from_env(lookup(&map)).unwrap_err();
254 assert!(err.contains("REDDB_BACKUP_PAUSE_ON_LAG_SECS"), "{err}");
255 }
256
257 #[test]
258 fn pause_on_lag_non_numeric_is_error() {
259 let map: HashMap<&'static str, &'static str> = [
260 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
261 ("REDDB_BACKUP_S3_BUCKET", "b"),
262 ("REDDB_BACKUP_S3_PREFIX", "p/"),
263 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
264 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
265 ("REDDB_BACKUP_PAUSE_ON_LAG_SECS", "soon"),
266 ]
267 .into_iter()
268 .collect();
269 let err = from_env(lookup(&map)).unwrap_err();
270 assert!(err.contains("REDDB_BACKUP_PAUSE_ON_LAG_SECS"), "{err}");
271 assert!(err.contains("non-negative"), "{err}");
272 }
273
274 #[test]
275 fn all_required_present_with_explicit_overrides() {
276 let map: HashMap<&'static str, &'static str> = [
277 ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
278 ("REDDB_BACKUP_S3_BUCKET", "b"),
279 ("REDDB_BACKUP_S3_PREFIX", "p/"),
280 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
281 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
282 ("REDDB_BACKUP_S3_REGION", "us-east-1"),
283 ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "60"),
284 ("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS", "5"),
285 ]
286 .into_iter()
287 .collect();
288 let cfg = from_env(lookup(&map)).unwrap().expect("Some");
289 assert_eq!(cfg.region, "us-east-1");
290 assert_eq!(cfg.checkpoint_interval_secs, 60);
291 assert_eq!(cfg.wal_flush_interval_secs, 5);
292 }
293
294 #[test]
295 fn partial_config_names_missing_var() {
296 let map: HashMap<&'static str, &'static str> = [
297 ("REDDB_BACKUP_S3_ENDPOINT", "https://s3.example.com"),
298 ("REDDB_BACKUP_S3_BUCKET", "b"),
299 ]
300 .into_iter()
301 .collect();
302 let err = from_env(lookup(&map)).unwrap_err();
303 assert!(err.contains("REDDB_BACKUP_S3_PREFIX"), "{err}");
304 assert!(err.contains("REDDB_BACKUP_S3_ACCESS_KEY_ID"), "{err}");
305 assert!(err.contains("REDDB_BACKUP_S3_SECRET_ACCESS_KEY"), "{err}");
306 }
307
308 #[test]
309 fn whitespace_only_required_treated_as_missing() {
310 let map: HashMap<&'static str, &'static str> = [
311 ("REDDB_BACKUP_S3_ENDPOINT", " "),
312 ("REDDB_BACKUP_S3_BUCKET", "b"),
313 ("REDDB_BACKUP_S3_PREFIX", "p/"),
314 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
315 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
316 ]
317 .into_iter()
318 .collect();
319 let err = from_env(lookup(&map)).unwrap_err();
320 assert!(err.contains("REDDB_BACKUP_S3_ENDPOINT"), "{err}");
321 }
322
323 #[test]
324 fn non_numeric_interval_is_error() {
325 let map: HashMap<&'static str, &'static str> = [
326 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
327 ("REDDB_BACKUP_S3_BUCKET", "b"),
328 ("REDDB_BACKUP_S3_PREFIX", "p/"),
329 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
330 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
331 ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "abc"),
332 ]
333 .into_iter()
334 .collect();
335 let err = from_env(lookup(&map)).unwrap_err();
336 assert!(
337 err.contains("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS"),
338 "{err}"
339 );
340 assert!(err.contains("positive integer"), "{err}");
341 }
342
343 #[test]
344 fn zero_interval_is_error() {
345 let map: HashMap<&'static str, &'static str> = [
346 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
347 ("REDDB_BACKUP_S3_BUCKET", "b"),
348 ("REDDB_BACKUP_S3_PREFIX", "p/"),
349 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
350 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
351 ("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS", "0"),
352 ]
353 .into_iter()
354 .collect();
355 let err = from_env(lookup(&map)).unwrap_err();
356 assert!(
357 err.contains("REDDB_BACKUP_WAL_FLUSH_INTERVAL_SECS"),
358 "{err}"
359 );
360 assert!(err.contains("> 0"), "{err}");
361 }
362
363 #[test]
364 fn negative_interval_is_error() {
365 let map: HashMap<&'static str, &'static str> = [
366 ("REDDB_BACKUP_S3_ENDPOINT", "https://x"),
367 ("REDDB_BACKUP_S3_BUCKET", "b"),
368 ("REDDB_BACKUP_S3_PREFIX", "p/"),
369 ("REDDB_BACKUP_S3_ACCESS_KEY_ID", "AK"),
370 ("REDDB_BACKUP_S3_SECRET_ACCESS_KEY", "SK"),
371 ("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS", "-10"),
372 ]
373 .into_iter()
374 .collect();
375 let err = from_env(lookup(&map)).unwrap_err();
376 assert!(
377 err.contains("REDDB_BACKUP_CHECKPOINT_INTERVAL_SECS"),
378 "{err}"
379 );
380 }
381}