Skip to main content

reddb_server/storage/backend/
http.rs

1//! Generic HTTP `RemoteBackend` (PLAN.md Phase 2.3).
2//!
3//! Speaks plain `PUT` / `GET` / `DELETE` against a configurable
4//! base URL. The intent is maximum portability: any custom service
5//! that exposes object storage over HTTP — IPFS gateways, in-house
6//! storage proxies, ad-hoc backup hosts, anything that takes a body
7//! on PUT and serves it back on GET — can serve as RedDB's backup
8//! target without writing a new backend.
9//!
10//! Wire contract:
11//!   - `PUT  {base}/{prefix}{key}` — body = file bytes
12//!   - `GET  {base}/{prefix}{key}` — 200 returns body, 404 means
13//!     "doesn't exist" (treated as `Ok(false)` by `download`)
14//!   - `DELETE {base}/{prefix}{key}` — 200/204 ok, 404 ignored
15//!   - `GET {base}/{prefix}?list=<sub-prefix>` — newline-delimited
16//!     list of keys, one per line
17//!
18//! Auth: every request adds the `Authorization` header from
19//! `HttpBackendConfig::auth_header`. The factory in service_cli
20//! reads it from `RED_HTTP_AUTH_HEADER_FILE` so the actual token
21//! never appears in env (Kubernetes Secrets / Vault Agent friendly).
22//!
23//! Transport: shells out to `curl(1)`, matching the S3 backend's
24//! choice. No TLS crate baked in, no async runtime requirement,
25//! universally available on every Linux/macOS/BSD distro.
26
27use std::path::Path;
28use std::process::Command;
29
30use super::{
31    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
32    RemoteBackend,
33};
34
35/// Configuration for the generic HTTP backend.
36#[derive(Debug, Clone)]
37pub struct HttpBackendConfig {
38    /// Base URL (e.g. `https://storage.example.com`). No trailing slash.
39    pub base_url: String,
40    /// Prefix prepended to every key (e.g. `databases/prod/`).
41    /// Empty string means "no prefix".
42    pub prefix: String,
43    /// Optional `Authorization: <value>` header. `None` means no auth.
44    pub auth_header: Option<String>,
45    /// Whether the server supports ETag + If-Match / If-None-Match.
46    pub conditional_writes: bool,
47}
48
49impl HttpBackendConfig {
50    pub fn new(base_url: impl Into<String>) -> Self {
51        Self {
52            base_url: base_url.into().trim_end_matches('/').to_string(),
53            prefix: String::new(),
54            auth_header: None,
55            conditional_writes: false,
56        }
57    }
58
59    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
60        let mut p = prefix.into();
61        if !p.is_empty() && !p.ends_with('/') {
62            p.push('/');
63        }
64        self.prefix = p;
65        self
66    }
67
68    pub fn with_auth_header(mut self, value: impl Into<String>) -> Self {
69        self.auth_header = Some(value.into());
70        self
71    }
72
73    pub fn with_conditional_writes(mut self, enabled: bool) -> Self {
74        self.conditional_writes = enabled;
75        self
76    }
77}
78
79pub struct HttpBackend {
80    config: HttpBackendConfig,
81}
82
83impl HttpBackend {
84    pub fn new(config: HttpBackendConfig) -> Self {
85        Self { config }
86    }
87
88    fn url_for(&self, key: &str) -> String {
89        format!(
90            "{}/{}{}",
91            self.config.base_url,
92            self.config.prefix,
93            key.trim_start_matches('/')
94        )
95    }
96
97    /// Run `curl` with the configured auth header and return its
98    /// process output. The caller decides what to do with non-zero
99    /// exit codes — for `download` a 404 is success-with-false, for
100    /// `upload` any non-2xx is an error.
101    fn curl(&self, args: &[&str]) -> Result<std::process::Output, BackendError> {
102        let args = args.iter().map(|arg| (*arg).to_string()).collect();
103        self.curl_owned(args, &[])
104    }
105
106    fn curl_owned(
107        &self,
108        args: Vec<String>,
109        extra_headers: &[(&str, &str)],
110    ) -> Result<std::process::Output, BackendError> {
111        let mut cmd = Command::new("curl");
112        cmd.arg("-sS"); // silent + show errors
113        cmd.arg("-w").arg("HTTPSTATUS:%{http_code}");
114        for a in args {
115            cmd.arg(a);
116        }
117        if let Some(ref auth) = self.config.auth_header {
118            cmd.arg("-H").arg(format!("Authorization: {}", auth));
119        }
120        for (name, value) in extra_headers {
121            cmd.arg("-H").arg(format!("{name}: {value}"));
122        }
123        cmd.output()
124            .map_err(|e| BackendError::Transport(format!("curl not available: {e}")))
125    }
126
127    /// Parse the trailing `HTTPSTATUS:NNN` token curl emits and
128    /// return `(http_code, body_without_status)`.
129    fn split_status(stdout: &[u8]) -> (u16, Vec<u8>) {
130        let s = String::from_utf8_lossy(stdout);
131        if let Some(idx) = s.rfind("HTTPSTATUS:") {
132            let body = stdout[..idx].to_vec();
133            let code: u16 = s[idx + "HTTPSTATUS:".len()..].trim().parse().unwrap_or(0);
134            (code, body)
135        } else {
136            (0, stdout.to_vec())
137        }
138    }
139
140    fn header_value(headers: &[u8], name: &str) -> Option<String> {
141        let needle = format!("{}:", name.to_ascii_lowercase());
142        String::from_utf8_lossy(headers)
143            .lines()
144            .filter_map(|line| {
145                let trimmed = line.trim();
146                let lower = trimmed.to_ascii_lowercase();
147                lower
148                    .starts_with(&needle)
149                    .then(|| trimmed[needle.len()..].trim().to_string())
150            })
151            .next_back()
152            .filter(|value| !value.is_empty())
153    }
154
155    #[inline]
156    fn null_device() -> &'static str {
157        #[cfg(windows)]
158        {
159            "NUL"
160        }
161        #[cfg(not(windows))]
162        {
163            "/dev/null"
164        }
165    }
166}
167
168impl RemoteBackend for HttpBackend {
169    fn name(&self) -> &str {
170        "http"
171    }
172
173    fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
174        let url = self.url_for(remote_key);
175        // Stream body to a temp file via -o; we still want HTTPSTATUS
176        // in stdout for the success/404 distinction.
177        let local_path_str = local_path.to_string_lossy().to_string();
178        let output = self.curl(&["-o", &local_path_str, "-X", "GET", &url])?;
179        if !output.status.success() {
180            // curl exits non-zero on transport errors (DNS, connection
181            // reset). Treat that as a hard failure regardless of HTTP
182            // code, since stdout may be empty.
183            let stderr = String::from_utf8_lossy(&output.stderr);
184            return Err(BackendError::Transport(format!(
185                "http GET {url}: curl failed: {stderr}"
186            )));
187        }
188        let (code, _body) = Self::split_status(&output.stdout);
189        match code {
190            200..=299 => Ok(true),
191            404 => {
192                // Make sure we don't leave a zero-byte file behind
193                // that downstream code mistakes for a real download.
194                let _ = std::fs::remove_file(local_path);
195                Ok(false)
196            }
197            _ => Err(BackendError::Transport(format!(
198                "http GET {url} returned status {code}"
199            ))),
200        }
201    }
202
203    fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
204        let url = self.url_for(remote_key);
205        let local_path_str = local_path.to_string_lossy().to_string();
206        let output = self.curl(&[
207            "-X",
208            "PUT",
209            "--data-binary",
210            &format!("@{}", local_path_str),
211            &url,
212        ])?;
213        if !output.status.success() {
214            return Err(BackendError::Transport(format!(
215                "http PUT {url}: curl failed: {}",
216                String::from_utf8_lossy(&output.stderr)
217            )));
218        }
219        let (code, body) = Self::split_status(&output.stdout);
220        if !(200..=299).contains(&code) {
221            return Err(BackendError::Transport(format!(
222                "http PUT {url} returned status {code}: {}",
223                String::from_utf8_lossy(&body)
224            )));
225        }
226        Ok(())
227    }
228
229    fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
230        let url = self.url_for(remote_key);
231        let output = self.curl(&["-I", "-X", "HEAD", &url])?;
232        if !output.status.success() {
233            return Err(BackendError::Transport(format!(
234                "http HEAD {url}: curl failed: {}",
235                String::from_utf8_lossy(&output.stderr)
236            )));
237        }
238        let (code, _) = Self::split_status(&output.stdout);
239        match code {
240            200..=299 => Ok(true),
241            404 => Ok(false),
242            other => Err(BackendError::Transport(format!(
243                "http HEAD {url} returned status {other}"
244            ))),
245        }
246    }
247
248    fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
249        let url = self.url_for(remote_key);
250        let output = self.curl(&["-X", "DELETE", &url])?;
251        if !output.status.success() {
252            return Err(BackendError::Transport(format!(
253                "http DELETE {url}: curl failed: {}",
254                String::from_utf8_lossy(&output.stderr)
255            )));
256        }
257        let (code, _) = Self::split_status(&output.stdout);
258        match code {
259            200..=299 | 404 => Ok(()),
260            other => Err(BackendError::Transport(format!(
261                "http DELETE {url} returned status {other}"
262            ))),
263        }
264    }
265
266    fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
267        // Convention: GET base/?list=<sub-prefix> returns
268        // newline-delimited keys. Servers that don't implement this
269        // can still serve the rest of the API; list will return an
270        // empty vec and PITR / archiver code will treat it as "no
271        // archived segments".
272        let url = format!(
273            "{}/{}?list={}",
274            self.config.base_url,
275            self.config.prefix.trim_end_matches('/'),
276            urlencode_simple(prefix)
277        );
278        let output = self.curl(&["-X", "GET", &url])?;
279        if !output.status.success() {
280            return Ok(Vec::new());
281        }
282        let (code, body) = Self::split_status(&output.stdout);
283        if !(200..=299).contains(&code) {
284            return Ok(Vec::new());
285        }
286        let text = String::from_utf8_lossy(&body);
287        Ok(text
288            .lines()
289            .map(|line| line.trim().to_string())
290            .filter(|line| !line.is_empty())
291            .collect())
292    }
293}
294
295/// HTTP backend that promises CAS — only constructible when the
296/// operator confirmed the upstream server honors RFC 7232
297/// preconditions (`If-Match` / `If-None-Match`).
298///
299/// Wrapping `HttpBackend` rather than mutating it keeps the snapshot-
300/// transport surface (download/upload/delete) callable on servers that
301/// don't support CAS, while still preventing `LeaseStore` from binding
302/// to a non-CAS HTTP server (the type system rejects it at compile).
303pub struct AtomicHttpBackend {
304    inner: HttpBackend,
305}
306
307impl AtomicHttpBackend {
308    /// Build a CAS-capable HTTP backend. Returns `BackendError::Config`
309    /// when `config.conditional_writes` is false — operators must
310    /// explicitly opt in via `RED_HTTP_CONDITIONAL_WRITES=true` after
311    /// confirming their server supports preconditions.
312    pub fn try_new(config: HttpBackendConfig) -> Result<Self, BackendError> {
313        if !config.conditional_writes {
314            return Err(BackendError::Config(
315                "AtomicHttpBackend requires HttpBackendConfig::conditional_writes=true \
316                 (set RED_HTTP_CONDITIONAL_WRITES=true once your server is verified to \
317                 honor If-Match / If-None-Match)"
318                    .into(),
319            ));
320        }
321        Ok(Self {
322            inner: HttpBackend::new(config),
323        })
324    }
325
326    pub fn inner(&self) -> &HttpBackend {
327        &self.inner
328    }
329}
330
331impl RemoteBackend for AtomicHttpBackend {
332    fn name(&self) -> &str {
333        self.inner.name()
334    }
335    fn download(&self, remote_key: &str, local_path: &Path) -> Result<bool, BackendError> {
336        self.inner.download(remote_key, local_path)
337    }
338    fn upload(&self, local_path: &Path, remote_key: &str) -> Result<(), BackendError> {
339        self.inner.upload(local_path, remote_key)
340    }
341    fn exists(&self, remote_key: &str) -> Result<bool, BackendError> {
342        self.inner.exists(remote_key)
343    }
344    fn delete(&self, remote_key: &str) -> Result<(), BackendError> {
345        self.inner.delete(remote_key)
346    }
347    fn list(&self, prefix: &str) -> Result<Vec<String>, BackendError> {
348        self.inner.list(prefix)
349    }
350}
351
352impl AtomicRemoteBackend for AtomicHttpBackend {
353    fn object_version(
354        &self,
355        remote_key: &str,
356    ) -> Result<Option<BackendObjectVersion>, BackendError> {
357        let url = self.inner.url_for(remote_key);
358        let output = self.inner.curl(&[
359            "-D",
360            "-",
361            "-o",
362            HttpBackend::null_device(),
363            "-X",
364            "HEAD",
365            &url,
366        ])?;
367        if !output.status.success() {
368            return Err(BackendError::Transport(format!(
369                "http HEAD {url}: curl failed: {}",
370                String::from_utf8_lossy(&output.stderr)
371            )));
372        }
373        let (code, body) = HttpBackend::split_status(&output.stdout);
374        match code {
375            200..=299 => HttpBackend::header_value(&body, "etag")
376                .map(BackendObjectVersion::new)
377                .map(Some)
378                .ok_or_else(|| BackendError::Internal(format!("http HEAD {url} missing ETag"))),
379            404 => Ok(None),
380            401 | 403 => Err(BackendError::Auth(format!(
381                "http HEAD {url} returned status {code}"
382            ))),
383            other => Err(BackendError::Transport(format!(
384                "http HEAD {url} returned status {other}"
385            ))),
386        }
387    }
388
389    fn upload_conditional(
390        &self,
391        local_path: &Path,
392        remote_key: &str,
393        condition: ConditionalPut,
394    ) -> Result<BackendObjectVersion, BackendError> {
395        let url = self.inner.url_for(remote_key);
396        let local_path_str = local_path.to_string_lossy().to_string();
397        let condition_header = match &condition {
398            ConditionalPut::IfAbsent => ("If-None-Match", "*"),
399            ConditionalPut::IfVersion(version) => ("If-Match", version.token.as_str()),
400        };
401        let output = self.inner.curl_owned(
402            vec![
403                "-X".into(),
404                "PUT".into(),
405                "--data-binary".into(),
406                format!("@{}", local_path_str),
407                url.clone(),
408            ],
409            &[condition_header],
410        )?;
411        if !output.status.success() {
412            return Err(BackendError::Transport(format!(
413                "http conditional PUT {url}: curl failed: {}",
414                String::from_utf8_lossy(&output.stderr)
415            )));
416        }
417        let (code, body) = HttpBackend::split_status(&output.stdout);
418        match code {
419            200..=299 => self.object_version(remote_key)?.ok_or_else(|| {
420                BackendError::Internal(format!("http object '{}' missing after upload", remote_key))
421            }),
422            404 | 409 | 412 => Err(BackendError::PreconditionFailed(format!(
423                "http conditional PUT {url} returned status {code}: {}",
424                String::from_utf8_lossy(&body)
425            ))),
426            401 | 403 => Err(BackendError::Auth(format!(
427                "http conditional PUT {url} returned status {code}"
428            ))),
429            other => Err(BackendError::Transport(format!(
430                "http conditional PUT {url} returned status {other}: {}",
431                String::from_utf8_lossy(&body)
432            ))),
433        }
434    }
435
436    fn delete_conditional(
437        &self,
438        remote_key: &str,
439        condition: ConditionalDelete,
440    ) -> Result<(), BackendError> {
441        let url = self.inner.url_for(remote_key);
442        let ConditionalDelete::IfVersion(version) = condition;
443        let output = self.inner.curl_owned(
444            vec!["-X".into(), "DELETE".into(), url.clone()],
445            &[("If-Match", version.token.as_str())],
446        )?;
447        if !output.status.success() {
448            return Err(BackendError::Transport(format!(
449                "http conditional DELETE {url}: curl failed: {}",
450                String::from_utf8_lossy(&output.stderr)
451            )));
452        }
453        let (code, _) = HttpBackend::split_status(&output.stdout);
454        match code {
455            200..=299 => Ok(()),
456            404 | 409 | 412 => Err(BackendError::PreconditionFailed(format!(
457                "http conditional DELETE {url} returned status {code}"
458            ))),
459            401 | 403 => Err(BackendError::Auth(format!(
460                "http conditional DELETE {url} returned status {code}"
461            ))),
462            other => Err(BackendError::Transport(format!(
463                "http conditional DELETE {url} returned status {other}"
464            ))),
465        }
466    }
467}
468
469/// Minimal RFC3986 percent-encoder for the query-string `list=` value.
470/// Doesn't pull in `url` or `percent-encoding` to keep the engine's
471/// dependency surface flat.
472fn urlencode_simple(input: &str) -> String {
473    let mut out = String::with_capacity(input.len());
474    for byte in input.bytes() {
475        match byte {
476            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
477                out.push(byte as char);
478            }
479            other => {
480                use std::fmt::Write;
481                let _ = write!(out, "%{:02X}", other);
482            }
483        }
484    }
485    out
486}
487
488#[cfg(test)]
489mod tests {
490    use super::*;
491
492    #[test]
493    fn url_for_strips_leading_slash() {
494        let backend = HttpBackend::new(
495            HttpBackendConfig::new("https://store.example/").with_prefix("dbs/prod"),
496        );
497        assert_eq!(
498            backend.url_for("/snapshots/1.snap"),
499            "https://store.example/dbs/prod/snapshots/1.snap"
500        );
501    }
502
503    #[test]
504    fn url_for_with_no_prefix() {
505        let backend = HttpBackend::new(HttpBackendConfig::new("https://store.example"));
506        assert_eq!(backend.url_for("a/b"), "https://store.example/a/b");
507    }
508
509    #[test]
510    fn split_status_parses_curl_output() {
511        let stdout = b"hello world\nHTTPSTATUS:200";
512        let (code, body) = HttpBackend::split_status(stdout);
513        assert_eq!(code, 200);
514        assert_eq!(body, b"hello world\n");
515    }
516
517    #[test]
518    fn split_status_handles_404() {
519        let stdout = b"HTTPSTATUS:404";
520        let (code, body) = HttpBackend::split_status(stdout);
521        assert_eq!(code, 404);
522        assert!(body.is_empty());
523    }
524
525    #[test]
526    fn urlencode_keeps_path_separators() {
527        // We use `/` in list prefixes; encoding it would break
528        // server-side prefix matching.
529        assert_eq!(urlencode_simple("snapshots/2026"), "snapshots/2026");
530    }
531
532    #[test]
533    fn urlencode_escapes_spaces() {
534        assert_eq!(urlencode_simple("hello world"), "hello%20world");
535    }
536}