1use convergio_db::pool::ConnPool;
4use convergio_types::extension::{
5 AppContext, ExtResult, Extension, Health, McpToolDef, Metric, Migration, ScheduledTask,
6};
7use convergio_types::manifest::{Capability, Manifest, ModuleKind};
8
9pub struct BackupExtension {
11 pool: ConnPool,
12}
13
14impl BackupExtension {
15 pub fn new(pool: ConnPool) -> Self {
16 Self { pool }
17 }
18
19 pub fn pool(&self) -> &ConnPool {
20 &self.pool
21 }
22}
23
24impl Default for BackupExtension {
25 fn default() -> Self {
26 let pool = convergio_db::pool::create_memory_pool().expect("in-memory pool for default");
27 Self { pool }
28 }
29}
30
31impl Extension for BackupExtension {
32 fn manifest(&self) -> Manifest {
33 Manifest {
34 id: "convergio-backup".to_string(),
35 description: "Data retention, backup, disaster recovery, \
36 and org data export/import"
37 .to_string(),
38 version: env!("CARGO_PKG_VERSION").to_string(),
39 kind: ModuleKind::Core,
40 provides: vec![
41 Capability {
42 name: "data-retention".to_string(),
43 version: "1.0".to_string(),
44 description: "Auto-purge expired data with configurable policies".to_string(),
45 },
46 Capability {
47 name: "db-backup".to_string(),
48 version: "1.0".to_string(),
49 description: "Periodic SQLite snapshots with WAL checkpoint".to_string(),
50 },
51 Capability {
52 name: "disaster-recovery".to_string(),
53 version: "1.0".to_string(),
54 description: "Restore daemon to a known state from snapshot".to_string(),
55 },
56 Capability {
57 name: "org-export-import".to_string(),
58 version: "1.0".to_string(),
59 description: "Export/import org data for cross-node migration".to_string(),
60 },
61 ],
62 requires: vec![],
63 agent_tools: vec![],
64 required_roles: vec!["orchestrator".into(), "all".into()],
65 }
66 }
67
68 fn routes(&self, _ctx: &AppContext) -> Option<axum::Router> {
69 let data_dir = convergio_types::platform_paths::convergio_data_dir();
70 let state = std::sync::Arc::new(crate::routes::BackupState {
71 pool: self.pool.clone(),
72 db_path: data_dir.join("convergio.db"),
73 backup_dir: crate::snapshot::backup_dir(&data_dir),
74 node_name: std::env::var("CONVERGIO_NODE_NAME").unwrap_or_else(|_| "local".into()),
75 });
76 Some(crate::routes::router(state))
77 }
78
79 fn migrations(&self) -> Vec<Migration> {
80 crate::schema::migrations()
81 }
82
83 fn on_start(&self, _ctx: &AppContext) -> ExtResult<()> {
84 tracing::info!("backup: extension started");
85 Ok(())
86 }
87
88 fn health(&self) -> Health {
89 match self.pool.get() {
90 Ok(conn) => {
91 let ok = conn
92 .query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
93 r.get::<_, i64>(0)
94 })
95 .is_ok();
96 if ok {
97 Health::Ok
98 } else {
99 Health::Degraded {
100 reason: "backup_snapshots table inaccessible".into(),
101 }
102 }
103 }
104 Err(e) => Health::Down {
105 reason: format!("pool error: {e}"),
106 },
107 }
108 }
109
110 fn metrics(&self) -> Vec<Metric> {
111 let conn = match self.pool.get() {
112 Ok(c) => c,
113 Err(_) => return vec![],
114 };
115 let mut metrics = Vec::new();
116 if let Ok(n) = conn.query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
117 r.get::<_, f64>(0)
118 }) {
119 metrics.push(Metric {
120 name: "backup.snapshots.total".into(),
121 value: n,
122 labels: vec![],
123 });
124 }
125 if let Ok(n) = conn.query_row(
126 "SELECT COALESCE(SUM(rows_deleted), 0) FROM backup_purge_log",
127 [],
128 |r| r.get::<_, f64>(0),
129 ) {
130 metrics.push(Metric {
131 name: "backup.purge.total_rows_deleted".into(),
132 value: n,
133 labels: vec![],
134 });
135 }
136 metrics
137 }
138
139 fn scheduled_tasks(&self) -> Vec<ScheduledTask> {
140 vec![
141 ScheduledTask {
142 name: "auto-purge",
143 cron: "0 3 * * *", },
145 ScheduledTask {
146 name: "auto-snapshot",
147 cron: "0 4 * * *", },
149 ]
150 }
151
152 fn mcp_tools(&self) -> Vec<McpToolDef> {
153 crate::mcp_defs::backup_tools()
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160
161 #[test]
162 fn manifest_has_correct_id() {
163 let ext = BackupExtension::default();
164 let m = ext.manifest();
165 assert_eq!(m.id, "convergio-backup");
166 assert_eq!(m.provides.len(), 4);
167 }
168
169 #[test]
170 fn migrations_are_returned() {
171 let ext = BackupExtension::default();
172 let migs = ext.migrations();
173 assert_eq!(migs.len(), 1);
174 }
175
176 #[test]
177 fn health_ok_with_memory_pool() {
178 let pool = convergio_db::pool::create_memory_pool().unwrap();
179 let conn = pool.get().unwrap();
180 for m in crate::schema::migrations() {
181 conn.execute_batch(m.up).unwrap();
182 }
183 drop(conn);
184 let ext = BackupExtension::new(pool);
185 assert!(matches!(ext.health(), Health::Ok));
186 }
187
188 #[test]
189 fn scheduled_tasks_declared() {
190 let ext = BackupExtension::default();
191 let tasks = ext.scheduled_tasks();
192 assert_eq!(tasks.len(), 2);
193 assert_eq!(tasks[0].name, "auto-purge");
194 assert_eq!(tasks[1].name, "auto-snapshot");
195 }
196
197 #[test]
198 fn metrics_with_empty_db() {
199 let pool = convergio_db::pool::create_memory_pool().unwrap();
200 let conn = pool.get().unwrap();
201 for m in crate::schema::migrations() {
202 conn.execute_batch(m.up).unwrap();
203 }
204 drop(conn);
205 let ext = BackupExtension::new(pool);
206 let m = ext.metrics();
207 assert_eq!(m.len(), 2);
208 assert_eq!(m[0].value, 0.0);
209 }
210}