use convergio_db::pool::ConnPool;
use convergio_types::extension::{
AppContext, ExtResult, Extension, Health, McpToolDef, Metric, Migration, ScheduledTask,
};
use convergio_types::manifest::{Capability, Manifest, ModuleKind};
pub struct BackupExtension {
pool: ConnPool,
}
impl BackupExtension {
pub fn new(pool: ConnPool) -> Self {
Self { pool }
}
pub fn pool(&self) -> &ConnPool {
&self.pool
}
}
impl Default for BackupExtension {
fn default() -> Self {
let pool = convergio_db::pool::create_memory_pool().expect("in-memory pool for default");
Self { pool }
}
}
impl Extension for BackupExtension {
fn manifest(&self) -> Manifest {
Manifest {
id: "convergio-backup".to_string(),
description: "Data retention, backup, disaster recovery, \
and org data export/import"
.to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
kind: ModuleKind::Core,
provides: vec![
Capability {
name: "data-retention".to_string(),
version: "1.0".to_string(),
description: "Auto-purge expired data with configurable policies".to_string(),
},
Capability {
name: "db-backup".to_string(),
version: "1.0".to_string(),
description: "Periodic SQLite snapshots with WAL checkpoint".to_string(),
},
Capability {
name: "disaster-recovery".to_string(),
version: "1.0".to_string(),
description: "Restore daemon to a known state from snapshot".to_string(),
},
Capability {
name: "org-export-import".to_string(),
version: "1.0".to_string(),
description: "Export/import org data for cross-node migration".to_string(),
},
],
requires: vec![],
agent_tools: vec![],
required_roles: vec!["orchestrator".into(), "all".into()],
}
}
fn routes(&self, _ctx: &AppContext) -> Option<axum::Router> {
let data_dir = convergio_types::platform_paths::convergio_data_dir();
let state = std::sync::Arc::new(crate::routes::BackupState {
pool: self.pool.clone(),
db_path: data_dir.join("convergio.db"),
backup_dir: crate::snapshot::backup_dir(&data_dir),
node_name: std::env::var("CONVERGIO_NODE_NAME").unwrap_or_else(|_| "local".into()),
});
Some(crate::routes::router(state))
}
fn migrations(&self) -> Vec<Migration> {
crate::schema::migrations()
}
fn on_start(&self, _ctx: &AppContext) -> ExtResult<()> {
tracing::info!("backup: extension started");
Ok(())
}
fn health(&self) -> Health {
match self.pool.get() {
Ok(conn) => {
let ok = conn
.query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
r.get::<_, i64>(0)
})
.is_ok();
if ok {
Health::Ok
} else {
Health::Degraded {
reason: "backup_snapshots table inaccessible".into(),
}
}
}
Err(e) => Health::Down {
reason: format!("pool error: {e}"),
},
}
}
fn metrics(&self) -> Vec<Metric> {
let conn = match self.pool.get() {
Ok(c) => c,
Err(_) => return vec![],
};
let mut metrics = Vec::new();
if let Ok(n) = conn.query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
r.get::<_, f64>(0)
}) {
metrics.push(Metric {
name: "backup.snapshots.total".into(),
value: n,
labels: vec![],
});
}
if let Ok(n) = conn.query_row(
"SELECT COALESCE(SUM(rows_deleted), 0) FROM backup_purge_log",
[],
|r| r.get::<_, f64>(0),
) {
metrics.push(Metric {
name: "backup.purge.total_rows_deleted".into(),
value: n,
labels: vec![],
});
}
metrics
}
fn scheduled_tasks(&self) -> Vec<ScheduledTask> {
vec![
ScheduledTask {
name: "auto-purge",
cron: "0 3 * * *", },
ScheduledTask {
name: "auto-snapshot",
cron: "0 4 * * *", },
]
}
fn mcp_tools(&self) -> Vec<McpToolDef> {
crate::mcp_defs::backup_tools()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn manifest_has_correct_id() {
let ext = BackupExtension::default();
let m = ext.manifest();
assert_eq!(m.id, "convergio-backup");
assert_eq!(m.provides.len(), 4);
}
#[test]
fn migrations_are_returned() {
let ext = BackupExtension::default();
let migs = ext.migrations();
assert_eq!(migs.len(), 1);
}
#[test]
fn health_ok_with_memory_pool() {
let pool = convergio_db::pool::create_memory_pool().unwrap();
let conn = pool.get().unwrap();
for m in crate::schema::migrations() {
conn.execute_batch(m.up).unwrap();
}
drop(conn);
let ext = BackupExtension::new(pool);
assert!(matches!(ext.health(), Health::Ok));
}
#[test]
fn scheduled_tasks_declared() {
let ext = BackupExtension::default();
let tasks = ext.scheduled_tasks();
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0].name, "auto-purge");
assert_eq!(tasks[1].name, "auto-snapshot");
}
#[test]
fn metrics_with_empty_db() {
let pool = convergio_db::pool::create_memory_pool().unwrap();
let conn = pool.get().unwrap();
for m in crate::schema::migrations() {
conn.execute_batch(m.up).unwrap();
}
drop(conn);
let ext = BackupExtension::new(pool);
let m = ext.metrics();
assert_eq!(m.len(), 2);
assert_eq!(m[0].value, 0.0);
}
}