Skip to main content

innate_core/backup/
mod.rs

1//! Cloudflare R2 backup for the Innate knowledge database.
2//!
3//! Uses S3-compatible API with AWS Signature V4.
4//! R2 endpoint: https://{account_id}.r2.cloudflarestorage.com
5//! Region for signing: "auto"
6
7use std::path::Path;
8
9use hmac::{Hmac, Mac};
10use sha2::{Digest, Sha256};
11
12use crate::settings::{BackupConfig, R2Config};
13
14mod command;
15
16pub(crate) use command::run_command;
17pub use command::BackupCommands;
18
19type HmacSha256 = Hmac<Sha256>;
20
21// ── Backup state (local cache of last backup time) ───────────────────────────
22
23fn backup_state_path() -> std::path::PathBuf {
24    crate::paths::backup_state_path()
25}
26
27#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
28pub struct BackupState {
29    pub last_backup_at: Option<String>,
30    pub last_backup_key: Option<String>,
31}
32
33fn load_state() -> BackupState {
34    let Ok(text) = std::fs::read_to_string(backup_state_path()) else {
35        return BackupState::default();
36    };
37    serde_json::from_str(&text).unwrap_or_default()
38}
39
40fn save_state(state: &BackupState) -> anyhow::Result<()> {
41    let path = backup_state_path();
42    if let Some(parent) = path.parent() {
43        std::fs::create_dir_all(parent)?;
44    }
45    std::fs::write(path, serde_json::to_string_pretty(state)?)?;
46    Ok(())
47}
48
49// ── Public result types ───────────────────────────────────────────────────────
50
51#[derive(Debug, Clone, serde::Serialize)]
52pub struct BackupInfo {
53    pub key: String,
54    pub last_modified: String,
55    pub size_bytes: u64,
56}
57
58#[derive(Debug, serde::Serialize)]
59pub struct PruneResult {
60    pub deleted: Vec<String>,
61    pub kept: usize,
62    /// Backups older than retention_days that were NOT deleted to honour min_backups.
63    pub protected_by_min: usize,
64}
65
66#[derive(Debug, serde::Serialize)]
67pub struct BackupResult {
68    pub key: String,
69    pub size_bytes: u64,
70    pub prune: PruneResult,
71}
72
73// ── R2 backup service ─────────────────────────────────────────────────────────
74
75pub struct R2BackupService {
76    account_id: String,
77    bucket: String,
78    access_key_id: String,
79    secret_access_key: String,
80    prefix: String,
81}
82
83impl R2BackupService {
84    pub fn from_config(cfg: &R2Config) -> anyhow::Result<Self> {
85        let access_key_id = cfg.resolved_access_key_id().ok_or_else(|| {
86            anyhow::anyhow!(
87                "R2 access_key_id not set; configure backup.r2.access_key_id \
88                 or INNATE_R2_ACCESS_KEY_ID env var"
89            )
90        })?;
91        let secret_access_key = cfg.resolved_secret_access_key().ok_or_else(|| {
92            anyhow::anyhow!(
93                "R2 secret_access_key not set; configure backup.r2.secret_access_key \
94                 or INNATE_R2_SECRET_ACCESS_KEY env var"
95            )
96        })?;
97        Ok(Self {
98            account_id: cfg.account_id.clone(),
99            bucket: cfg.bucket.clone(),
100            access_key_id,
101            secret_access_key,
102            prefix: cfg.prefix.clone(),
103        })
104    }
105
106    fn endpoint_base(&self) -> String {
107        format!("https://{}.r2.cloudflarestorage.com", self.account_id)
108    }
109
110    fn host(&self) -> String {
111        format!("{}.r2.cloudflarestorage.com", self.account_id)
112    }
113
114    // ── AWS Sig V4 ────────────────────────────────────────────────────────────
115
116    fn sha256_hex(data: &[u8]) -> String {
117        let mut h = Sha256::new();
118        h.update(data);
119        hex_bytes(&h.finalize())
120    }
121
122    fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
123        let mut mac = HmacSha256::new_from_slice(key).expect("HMAC accepts any key size");
124        mac.update(data);
125        mac.finalize().into_bytes().to_vec()
126    }
127
128    /// Returns `(Authorization header value, x-amz-content-sha256 value)`.
129    fn sign(
130        &self,
131        method: &str,
132        path: &str,  // URI path, e.g. "/bucket/key" (already URI-encoded)
133        query: &str, // canonical query string (keys=values sorted, values URI-encoded)
134        body: &[u8],
135        datetime: &str, // "20240101T120000Z"
136    ) -> (String, String) {
137        let date = &datetime[..8];
138        let payload_hash = Self::sha256_hex(body);
139        let host = self.host();
140
141        // Canonical headers — must be sorted by lowercase name.
142        let mut hdrs: Vec<(String, String)> = vec![
143            ("host".into(), host.clone()),
144            ("x-amz-content-sha256".into(), payload_hash.clone()),
145            ("x-amz-date".into(), datetime.into()),
146        ];
147        hdrs.sort_by(|a, b| a.0.cmp(&b.0));
148
149        let canonical_headers: String =
150            hdrs.iter().map(|(k, v)| format!("{}:{}\n", k, v)).collect();
151        let signed_headers: String = hdrs
152            .iter()
153            .map(|(k, _)| k.as_str())
154            .collect::<Vec<_>>()
155            .join(";");
156
157        let canonical_request = format!(
158            "{}\n{}\n{}\n{}\n{}\n{}",
159            method, path, query, canonical_headers, signed_headers, payload_hash
160        );
161
162        let credential_scope = format!("{}/auto/s3/aws4_request", date);
163        let string_to_sign = format!(
164            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
165            datetime,
166            credential_scope,
167            Self::sha256_hex(canonical_request.as_bytes())
168        );
169
170        let k_date = Self::hmac_sha256(
171            format!("AWS4{}", self.secret_access_key).as_bytes(),
172            date.as_bytes(),
173        );
174        let k_region = Self::hmac_sha256(&k_date, b"auto");
175        let k_service = Self::hmac_sha256(&k_region, b"s3");
176        let k_signing = Self::hmac_sha256(&k_service, b"aws4_request");
177        let signature = hex_bytes(&Self::hmac_sha256(&k_signing, string_to_sign.as_bytes()));
178
179        let auth = format!(
180            "AWS4-HMAC-SHA256 Credential={}/{},SignedHeaders={},Signature={}",
181            self.access_key_id, credential_scope, signed_headers, signature
182        );
183        (auth, payload_hash)
184    }
185
186    fn now_datetime() -> String {
187        chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string()
188    }
189
190    // ── HTTP operations ───────────────────────────────────────────────────────
191
192    fn put_object(&self, key: &str, body: &[u8]) -> anyhow::Result<()> {
193        let path = format!("/{}/{}", self.bucket, uri_encode_path(key));
194        let datetime = Self::now_datetime();
195        let (auth, payload_hash) = self.sign("PUT", &path, "", body, &datetime);
196        let url = format!("{}{}", self.endpoint_base(), path);
197        ureq::put(&url)
198            .set("x-amz-date", &datetime)
199            .set("x-amz-content-sha256", &payload_hash)
200            .set("Authorization", &auth)
201            .send_bytes(body)
202            .map_err(|e| anyhow::anyhow!("R2 PUT {key}: {e}"))?;
203        Ok(())
204    }
205
206    fn delete_object(&self, key: &str) -> anyhow::Result<()> {
207        let path = format!("/{}/{}", self.bucket, uri_encode_path(key));
208        let datetime = Self::now_datetime();
209        let (auth, payload_hash) = self.sign("DELETE", &path, "", &[], &datetime);
210        let url = format!("{}{}", self.endpoint_base(), path);
211        match ureq::delete(&url)
212            .set("x-amz-date", &datetime)
213            .set("x-amz-content-sha256", &payload_hash)
214            .set("Authorization", &auth)
215            .call()
216        {
217            Ok(_) => Ok(()),
218            Err(ureq::Error::Status(404, _)) => Ok(()), // already gone
219            Err(e) => Err(anyhow::anyhow!("R2 DELETE {key}: {e}")),
220        }
221    }
222
223    fn list_objects(&self) -> anyhow::Result<Vec<BackupInfo>> {
224        // Build canonical query: list-type=2 (and prefix if set).
225        // Values must be URI-encoded in the canonical query string.
226        let mut query_params: Vec<(&str, String)> = vec![("list-type", "2".into())];
227        if !self.prefix.is_empty() {
228            query_params.push(("prefix", uri_encode_value(&self.prefix)));
229        }
230        query_params.sort_by_key(|(k, _)| *k);
231        let canonical_query: String = query_params
232            .iter()
233            .map(|(k, v)| format!("{}={}", k, v))
234            .collect::<Vec<_>>()
235            .join("&");
236
237        let path = format!("/{}", self.bucket);
238        let datetime = Self::now_datetime();
239        let (auth, payload_hash) = self.sign("GET", &path, &canonical_query, &[], &datetime);
240        let url = format!("{}{}?{}", self.endpoint_base(), path, canonical_query);
241
242        let body = ureq::get(&url)
243            .set("x-amz-date", &datetime)
244            .set("x-amz-content-sha256", &payload_hash)
245            .set("Authorization", &auth)
246            .call()
247            .map_err(|e| anyhow::anyhow!("R2 LIST: {e}"))?
248            .into_string()
249            .map_err(|e| anyhow::anyhow!("R2 LIST read: {e}"))?;
250
251        Ok(parse_list_xml(&body))
252    }
253
254    // ── Public API ────────────────────────────────────────────────────────────
255
256    /// Create a consistent SQLite copy via VACUUM INTO, upload to R2, then prune old backups.
257    pub fn backup_now(
258        &self,
259        db_path: &Path,
260        retention_days: u64,
261        min_backups: usize,
262    ) -> anyhow::Result<BackupResult> {
263        // Temporary file for the VACUUM copy.
264        let tmp_dir = crate::paths::tmp_dir();
265        std::fs::create_dir_all(&tmp_dir)?;
266        let ts_str = chrono::Utc::now().format("%Y%m%d-%H%M%S").to_string();
267        let tmp_path = tmp_dir.join(format!("innate-backup-{ts_str}.db"));
268
269        let vacuum_result = (|| -> anyhow::Result<Vec<u8>> {
270            let conn = rusqlite::Connection::open(db_path)
271                .map_err(|e| anyhow::anyhow!("Cannot open DB for backup: {e}"))?;
272            conn.execute(
273                "VACUUM INTO ?1",
274                rusqlite::params![tmp_path.to_string_lossy().as_ref()],
275            )
276            .map_err(|e| anyhow::anyhow!("VACUUM INTO failed: {e}"))?;
277            let bytes = std::fs::read(&tmp_path)
278                .map_err(|e| anyhow::anyhow!("Cannot read backup temp file: {e}"))?;
279            Ok(bytes)
280        })();
281        let _ = std::fs::remove_file(&tmp_path); // clean up regardless
282        let body = vacuum_result?;
283
284        let size_bytes = body.len() as u64;
285        let key = format!("{}innate-backup-{ts_str}.db", self.prefix);
286        self.put_object(&key, &body)?;
287
288        let now = crate::utils::utc_now_iso();
289        let _ = save_state(&BackupState {
290            last_backup_at: Some(now),
291            last_backup_key: Some(key.clone()),
292        });
293
294        let prune = self.prune_old_backups(retention_days, min_backups)?;
295
296        Ok(BackupResult {
297            key,
298            size_bytes,
299            prune,
300        })
301    }
302
303    /// List all backup objects in R2.
304    pub fn list_backups(&self) -> anyhow::Result<Vec<BackupInfo>> {
305        let mut backups = self.list_objects()?;
306        backups.sort_by(|a, b| a.last_modified.cmp(&b.last_modified));
307        Ok(backups)
308    }
309
310    /// Delete backups older than `retention_days`, but always keep at least `min_backups`.
311    pub fn prune_old_backups(
312        &self,
313        retention_days: u64,
314        min_backups: usize,
315    ) -> anyhow::Result<PruneResult> {
316        let mut backups = self.list_objects()?;
317        // Oldest first.
318        backups.sort_by(|a, b| a.last_modified.cmp(&b.last_modified));
319
320        let cutoff = (chrono::Utc::now() - chrono::Duration::days(retention_days as i64))
321            .format("%Y-%m-%dT%H:%M:%S")
322            .to_string();
323
324        let total = backups.len();
325        let old_indices: Vec<usize> = backups
326            .iter()
327            .enumerate()
328            .filter(|(_, b)| b.last_modified < cutoff)
329            .map(|(i, _)| i)
330            .collect();
331
332        // We must keep at least min_backups total.
333        let max_deletable = total.saturating_sub(min_backups);
334        let deletable = old_indices.len().min(max_deletable);
335        let protected = old_indices.len().saturating_sub(deletable);
336
337        let to_delete: Vec<String> = old_indices
338            .into_iter()
339            .take(deletable)
340            .map(|i| backups[i].key.clone())
341            .collect();
342
343        for key in &to_delete {
344            self.delete_object(key)?;
345        }
346
347        Ok(PruneResult {
348            kept: total - to_delete.len(),
349            deleted: to_delete,
350            protected_by_min: protected,
351        })
352    }
353
354    // ── State helpers (no network) ────────────────────────────────────────────
355
356    /// Returns true if a backup is due based on local state (no network call).
357    pub fn needs_backup(interval_hours: u64) -> bool {
358        let state = load_state();
359        let Some(last_at) = state.last_backup_at else {
360            return true;
361        };
362        // utc_now_iso() format: "2024-01-01T12:00:00.000Z"
363        let Ok(last_dt) = chrono::DateTime::parse_from_rfc3339(&last_at) else {
364            return true;
365        };
366        let elapsed = chrono::Utc::now().signed_duration_since(last_dt.with_timezone(&chrono::Utc));
367        elapsed.num_hours() >= interval_hours as i64
368    }
369
370    pub fn last_backup_state() -> BackupState {
371        load_state()
372    }
373}
374
375// ── Auto-backup entry point (called by daemon and MCP/CLI) ───────────────────
376
377/// Run a backup if the configured interval has elapsed since the last backup.
378/// Returns true if a backup was performed.
379pub fn maybe_auto_backup(db_path: &Path, cfg: &BackupConfig) -> anyhow::Result<bool> {
380    if !cfg.enable {
381        return Ok(false);
382    }
383    let r2_cfg = match &cfg.r2 {
384        Some(c) => c,
385        None => return Ok(false),
386    };
387    if !R2BackupService::needs_backup(cfg.auto_backup_interval_hours) {
388        return Ok(false);
389    }
390    let svc = R2BackupService::from_config(r2_cfg)?;
391    svc.backup_now(db_path, cfg.retention_days, cfg.min_backups)?;
392    Ok(true)
393}
394
395// ── Helpers ───────────────────────────────────────────────────────────────────
396
397fn hex_bytes(bytes: &[u8]) -> String {
398    bytes.iter().map(|b| format!("{b:02x}")).collect()
399}
400
401/// URI-encode a key path segment, preserving forward slashes.
402fn uri_encode_path(s: &str) -> String {
403    let mut out = String::new();
404    for b in s.as_bytes() {
405        match b {
406            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
407                out.push(*b as char);
408            }
409            other => out.push_str(&format!("%{other:02X}")),
410        }
411    }
412    out
413}
414
415/// URI-encode a query parameter value (encodes slashes too).
416fn uri_encode_value(s: &str) -> String {
417    let mut out = String::new();
418    for b in s.as_bytes() {
419        match b {
420            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
421                out.push(*b as char);
422            }
423            other => {
424                out.push_str(&format!("%{other:02X}"));
425            }
426        }
427    }
428    out
429}
430
431/// Parse S3 XML list-objects-v2 response into BackupInfo list.
432fn parse_list_xml(body: &str) -> Vec<BackupInfo> {
433    let mut results = Vec::new();
434    for block in body.split("<Contents>").skip(1) {
435        let key = xml_text(block, "Key");
436        let lm = xml_text(block, "LastModified");
437        let size = xml_text(block, "Size")
438            .and_then(|s| s.parse().ok())
439            .unwrap_or(0);
440        if let (Some(key), Some(last_modified)) = (key, lm) {
441            results.push(BackupInfo {
442                key,
443                last_modified,
444                size_bytes: size,
445            });
446        }
447    }
448    results
449}
450
451fn xml_text(haystack: &str, tag: &str) -> Option<String> {
452    let open = format!("<{tag}>");
453    let close = format!("</{tag}>");
454    let start = haystack.find(&open)? + open.len();
455    let end = haystack[start..].find(&close)? + start;
456    Some(haystack[start..end].to_owned())
457}