Skip to main content

innate_core/backup/
mod.rs

1//! Cloudflare R2 backup for the Innate knowledge database.
2//!
3//! Uses S3-compatible API with AWS Signature V4.
4//! R2 endpoint: https://{account_id}.r2.cloudflarestorage.com
5//! Region for signing: "auto"
6
7use std::path::Path;
8
9use hmac::digest::KeyInit; // `new_from_slice` moved to KeyInit in digest 0.11
10use hmac::{Hmac, Mac};
11use sha2::{Digest, Sha256};
12
13use crate::settings::{BackupConfig, R2Config};
14
15mod command;
16
17pub(crate) use command::run_command;
18pub use command::BackupCommands;
19
20type HmacSha256 = Hmac<Sha256>;
21
22// ── Backup state (local cache of last backup time) ───────────────────────────
23
24fn backup_state_path() -> std::path::PathBuf {
25    crate::paths::backup_state_path()
26}
27
28#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
29pub struct BackupState {
30    pub last_backup_at: Option<String>,
31    pub last_backup_key: Option<String>,
32}
33
34fn load_state() -> BackupState {
35    let Ok(text) = std::fs::read_to_string(backup_state_path()) else {
36        return BackupState::default();
37    };
38    serde_json::from_str(&text).unwrap_or_default()
39}
40
41fn save_state(state: &BackupState) -> anyhow::Result<()> {
42    let path = backup_state_path();
43    if let Some(parent) = path.parent() {
44        std::fs::create_dir_all(parent)?;
45    }
46    std::fs::write(path, serde_json::to_string_pretty(state)?)?;
47    Ok(())
48}
49
50// ── Public result types ───────────────────────────────────────────────────────
51
52#[derive(Debug, Clone, serde::Serialize)]
53pub struct BackupInfo {
54    pub key: String,
55    pub last_modified: String,
56    pub size_bytes: u64,
57}
58
59#[derive(Debug, serde::Serialize)]
60pub struct PruneResult {
61    pub deleted: Vec<String>,
62    pub kept: usize,
63    /// Backups older than retention_days that were NOT deleted to honour min_backups.
64    pub protected_by_min: usize,
65}
66
67#[derive(Debug, serde::Serialize)]
68pub struct BackupResult {
69    pub key: String,
70    pub size_bytes: u64,
71    pub prune: PruneResult,
72}
73
74// ── R2 backup service ─────────────────────────────────────────────────────────
75
76pub struct R2BackupService {
77    account_id: String,
78    bucket: String,
79    access_key_id: String,
80    secret_access_key: String,
81    prefix: String,
82}
83
84impl R2BackupService {
85    pub fn from_config(cfg: &R2Config) -> anyhow::Result<Self> {
86        let access_key_id = cfg.resolved_access_key_id().ok_or_else(|| {
87            anyhow::anyhow!(
88                "R2 access_key_id not set; configure backup.r2.access_key_id \
89                 or INNATE_R2_ACCESS_KEY_ID env var"
90            )
91        })?;
92        let secret_access_key = cfg.resolved_secret_access_key().ok_or_else(|| {
93            anyhow::anyhow!(
94                "R2 secret_access_key not set; configure backup.r2.secret_access_key \
95                 or INNATE_R2_SECRET_ACCESS_KEY env var"
96            )
97        })?;
98        Ok(Self {
99            account_id: cfg.account_id.clone(),
100            bucket: cfg.bucket.clone(),
101            access_key_id,
102            secret_access_key,
103            prefix: cfg.prefix.clone(),
104        })
105    }
106
107    fn endpoint_base(&self) -> String {
108        format!("https://{}.r2.cloudflarestorage.com", self.account_id)
109    }
110
111    fn host(&self) -> String {
112        format!("{}.r2.cloudflarestorage.com", self.account_id)
113    }
114
115    // ── AWS Sig V4 ────────────────────────────────────────────────────────────
116
117    fn sha256_hex(data: &[u8]) -> String {
118        let mut h = Sha256::new();
119        h.update(data);
120        hex_bytes(&h.finalize())
121    }
122
123    fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
124        let mut mac = HmacSha256::new_from_slice(key).expect("HMAC accepts any key size");
125        mac.update(data);
126        mac.finalize().into_bytes().to_vec()
127    }
128
129    /// Returns `(Authorization header value, x-amz-content-sha256 value)`.
130    fn sign(
131        &self,
132        method: &str,
133        path: &str,  // URI path, e.g. "/bucket/key" (already URI-encoded)
134        query: &str, // canonical query string (keys=values sorted, values URI-encoded)
135        body: &[u8],
136        datetime: &str, // "20240101T120000Z"
137    ) -> (String, String) {
138        let date = &datetime[..8];
139        let payload_hash = Self::sha256_hex(body);
140        let host = self.host();
141
142        // Canonical headers — must be sorted by lowercase name.
143        let mut hdrs: Vec<(String, String)> = vec![
144            ("host".into(), host.clone()),
145            ("x-amz-content-sha256".into(), payload_hash.clone()),
146            ("x-amz-date".into(), datetime.into()),
147        ];
148        hdrs.sort_by(|a, b| a.0.cmp(&b.0));
149
150        let canonical_headers: String =
151            hdrs.iter().map(|(k, v)| format!("{}:{}\n", k, v)).collect();
152        let signed_headers: String = hdrs
153            .iter()
154            .map(|(k, _)| k.as_str())
155            .collect::<Vec<_>>()
156            .join(";");
157
158        let canonical_request = format!(
159            "{}\n{}\n{}\n{}\n{}\n{}",
160            method, path, query, canonical_headers, signed_headers, payload_hash
161        );
162
163        let credential_scope = format!("{}/auto/s3/aws4_request", date);
164        let string_to_sign = format!(
165            "AWS4-HMAC-SHA256\n{}\n{}\n{}",
166            datetime,
167            credential_scope,
168            Self::sha256_hex(canonical_request.as_bytes())
169        );
170
171        let k_date = Self::hmac_sha256(
172            format!("AWS4{}", self.secret_access_key).as_bytes(),
173            date.as_bytes(),
174        );
175        let k_region = Self::hmac_sha256(&k_date, b"auto");
176        let k_service = Self::hmac_sha256(&k_region, b"s3");
177        let k_signing = Self::hmac_sha256(&k_service, b"aws4_request");
178        let signature = hex_bytes(&Self::hmac_sha256(&k_signing, string_to_sign.as_bytes()));
179
180        let auth = format!(
181            "AWS4-HMAC-SHA256 Credential={}/{},SignedHeaders={},Signature={}",
182            self.access_key_id, credential_scope, signed_headers, signature
183        );
184        (auth, payload_hash)
185    }
186
187    fn now_datetime() -> String {
188        chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string()
189    }
190
191    // ── HTTP operations ───────────────────────────────────────────────────────
192
193    fn put_object(&self, key: &str, body: &[u8]) -> anyhow::Result<()> {
194        let path = format!("/{}/{}", self.bucket, uri_encode_path(key));
195        let datetime = Self::now_datetime();
196        let (auth, payload_hash) = self.sign("PUT", &path, "", body, &datetime);
197        let url = format!("{}{}", self.endpoint_base(), path);
198        ureq::put(&url)
199            .header("x-amz-date", &datetime)
200            .header("x-amz-content-sha256", &payload_hash)
201            .header("Authorization", &auth)
202            .send(body)
203            .map_err(|e| anyhow::anyhow!("R2 PUT {key}: {e}"))?;
204        Ok(())
205    }
206
207    fn delete_object(&self, key: &str) -> anyhow::Result<()> {
208        let path = format!("/{}/{}", self.bucket, uri_encode_path(key));
209        let datetime = Self::now_datetime();
210        let (auth, payload_hash) = self.sign("DELETE", &path, "", &[], &datetime);
211        let url = format!("{}{}", self.endpoint_base(), path);
212        match ureq::delete(&url)
213            .header("x-amz-date", &datetime)
214            .header("x-amz-content-sha256", &payload_hash)
215            .header("Authorization", &auth)
216            .call()
217        {
218            Ok(_) => Ok(()),
219            Err(ureq::Error::StatusCode(404)) => Ok(()), // already gone
220            Err(e) => Err(anyhow::anyhow!("R2 DELETE {key}: {e}")),
221        }
222    }
223
224    fn list_objects(&self) -> anyhow::Result<Vec<BackupInfo>> {
225        // Build canonical query: list-type=2 (and prefix if set).
226        // Values must be URI-encoded in the canonical query string.
227        let mut query_params: Vec<(&str, String)> = vec![("list-type", "2".into())];
228        if !self.prefix.is_empty() {
229            query_params.push(("prefix", uri_encode_value(&self.prefix)));
230        }
231        query_params.sort_by_key(|(k, _)| *k);
232        let canonical_query: String = query_params
233            .iter()
234            .map(|(k, v)| format!("{}={}", k, v))
235            .collect::<Vec<_>>()
236            .join("&");
237
238        let path = format!("/{}", self.bucket);
239        let datetime = Self::now_datetime();
240        let (auth, payload_hash) = self.sign("GET", &path, &canonical_query, &[], &datetime);
241        let url = format!("{}{}?{}", self.endpoint_base(), path, canonical_query);
242
243        let body = ureq::get(&url)
244            .header("x-amz-date", &datetime)
245            .header("x-amz-content-sha256", &payload_hash)
246            .header("Authorization", &auth)
247            .call()
248            .map_err(|e| anyhow::anyhow!("R2 LIST: {e}"))?
249            .body_mut()
250            .read_to_string()
251            .map_err(|e| anyhow::anyhow!("R2 LIST read: {e}"))?;
252
253        Ok(parse_list_xml(&body))
254    }
255
256    // ── Public API ────────────────────────────────────────────────────────────
257
258    /// Create a consistent SQLite copy via VACUUM INTO, upload to R2, then prune old backups.
259    pub fn backup_now(
260        &self,
261        db_path: &Path,
262        retention_days: u64,
263        min_backups: usize,
264    ) -> anyhow::Result<BackupResult> {
265        // Temporary file for the VACUUM copy.
266        let tmp_dir = crate::paths::tmp_dir();
267        std::fs::create_dir_all(&tmp_dir)?;
268        let ts_str = chrono::Utc::now().format("%Y%m%d-%H%M%S").to_string();
269        let tmp_path = tmp_dir.join(format!("innate-backup-{ts_str}.db"));
270
271        let vacuum_result = (|| -> anyhow::Result<Vec<u8>> {
272            let conn = rusqlite::Connection::open(db_path)
273                .map_err(|e| anyhow::anyhow!("Cannot open DB for backup: {e}"))?;
274            conn.execute(
275                "VACUUM INTO ?1",
276                rusqlite::params![tmp_path.to_string_lossy().as_ref()],
277            )
278            .map_err(|e| anyhow::anyhow!("VACUUM INTO failed: {e}"))?;
279            let bytes = std::fs::read(&tmp_path)
280                .map_err(|e| anyhow::anyhow!("Cannot read backup temp file: {e}"))?;
281            Ok(bytes)
282        })();
283        let _ = std::fs::remove_file(&tmp_path); // clean up regardless
284        let body = vacuum_result?;
285
286        let size_bytes = body.len() as u64;
287        let key = format!("{}innate-backup-{ts_str}.db", self.prefix);
288        self.put_object(&key, &body)?;
289
290        let now = crate::utils::utc_now_iso();
291        let _ = save_state(&BackupState {
292            last_backup_at: Some(now),
293            last_backup_key: Some(key.clone()),
294        });
295
296        let prune = self.prune_old_backups(retention_days, min_backups)?;
297
298        Ok(BackupResult {
299            key,
300            size_bytes,
301            prune,
302        })
303    }
304
305    /// List all backup objects in R2.
306    pub fn list_backups(&self) -> anyhow::Result<Vec<BackupInfo>> {
307        let mut backups = self.list_objects()?;
308        backups.sort_by(|a, b| a.last_modified.cmp(&b.last_modified));
309        Ok(backups)
310    }
311
312    /// Delete backups older than `retention_days`, but always keep at least `min_backups`.
313    pub fn prune_old_backups(
314        &self,
315        retention_days: u64,
316        min_backups: usize,
317    ) -> anyhow::Result<PruneResult> {
318        let mut backups = self.list_objects()?;
319        // Oldest first.
320        backups.sort_by(|a, b| a.last_modified.cmp(&b.last_modified));
321
322        let cutoff = (chrono::Utc::now() - chrono::Duration::days(retention_days as i64))
323            .format("%Y-%m-%dT%H:%M:%S")
324            .to_string();
325
326        let total = backups.len();
327        let old_indices: Vec<usize> = backups
328            .iter()
329            .enumerate()
330            .filter(|(_, b)| b.last_modified < cutoff)
331            .map(|(i, _)| i)
332            .collect();
333
334        // We must keep at least min_backups total.
335        let max_deletable = total.saturating_sub(min_backups);
336        let deletable = old_indices.len().min(max_deletable);
337        let protected = old_indices.len().saturating_sub(deletable);
338
339        let to_delete: Vec<String> = old_indices
340            .into_iter()
341            .take(deletable)
342            .map(|i| backups[i].key.clone())
343            .collect();
344
345        for key in &to_delete {
346            self.delete_object(key)?;
347        }
348
349        Ok(PruneResult {
350            kept: total - to_delete.len(),
351            deleted: to_delete,
352            protected_by_min: protected,
353        })
354    }
355
356    // ── State helpers (no network) ────────────────────────────────────────────
357
358    /// Returns true if a backup is due based on local state (no network call).
359    pub fn needs_backup(interval_hours: u64) -> bool {
360        let state = load_state();
361        let Some(last_at) = state.last_backup_at else {
362            return true;
363        };
364        // utc_now_iso() format: "2024-01-01T12:00:00.000Z"
365        let Ok(last_dt) = chrono::DateTime::parse_from_rfc3339(&last_at) else {
366            return true;
367        };
368        let elapsed = chrono::Utc::now().signed_duration_since(last_dt.with_timezone(&chrono::Utc));
369        elapsed.num_hours() >= interval_hours as i64
370    }
371
372    pub fn last_backup_state() -> BackupState {
373        load_state()
374    }
375}
376
377// ── Auto-backup entry point (called by daemon and MCP/CLI) ───────────────────
378
379/// Run a backup if the configured interval has elapsed since the last backup.
380/// Returns true if a backup was performed.
381pub fn maybe_auto_backup(db_path: &Path, cfg: &BackupConfig) -> anyhow::Result<bool> {
382    if !cfg.enable {
383        return Ok(false);
384    }
385    let r2_cfg = match &cfg.r2 {
386        Some(c) => c,
387        None => return Ok(false),
388    };
389    if !R2BackupService::needs_backup(cfg.auto_backup_interval_hours) {
390        return Ok(false);
391    }
392    let svc = R2BackupService::from_config(r2_cfg)?;
393    svc.backup_now(db_path, cfg.retention_days, cfg.min_backups)?;
394    Ok(true)
395}
396
397// ── Helpers ───────────────────────────────────────────────────────────────────
398
399fn hex_bytes(bytes: &[u8]) -> String {
400    bytes.iter().map(|b| format!("{b:02x}")).collect()
401}
402
403/// URI-encode a key path segment, preserving forward slashes.
404fn uri_encode_path(s: &str) -> String {
405    let mut out = String::new();
406    for b in s.as_bytes() {
407        match b {
408            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' | b'/' => {
409                out.push(*b as char);
410            }
411            other => out.push_str(&format!("%{other:02X}")),
412        }
413    }
414    out
415}
416
417/// URI-encode a query parameter value (encodes slashes too).
418fn uri_encode_value(s: &str) -> String {
419    let mut out = String::new();
420    for b in s.as_bytes() {
421        match b {
422            b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
423                out.push(*b as char);
424            }
425            other => {
426                out.push_str(&format!("%{other:02X}"));
427            }
428        }
429    }
430    out
431}
432
433/// Parse S3 XML list-objects-v2 response into BackupInfo list.
434fn parse_list_xml(body: &str) -> Vec<BackupInfo> {
435    let mut results = Vec::new();
436    for block in body.split("<Contents>").skip(1) {
437        let key = xml_text(block, "Key");
438        let lm = xml_text(block, "LastModified");
439        let size = xml_text(block, "Size")
440            .and_then(|s| s.parse().ok())
441            .unwrap_or(0);
442        if let (Some(key), Some(last_modified)) = (key, lm) {
443            results.push(BackupInfo {
444                key,
445                last_modified,
446                size_bytes: size,
447            });
448        }
449    }
450    results
451}
452
453fn xml_text(haystack: &str, tag: &str) -> Option<String> {
454    let open = format!("<{tag}>");
455    let close = format!("</{tag}>");
456    let start = haystack.find(&open)? + open.len();
457    let end = haystack[start..].find(&close)? + start;
458    Some(haystack[start..end].to_owned())
459}