Skip to main content

convergio_backup/
ext.rs

1//! BackupExtension — impl Extension for the backup module.
2
3use convergio_db::pool::ConnPool;
4use convergio_types::extension::{
5    AppContext, ExtResult, Extension, Health, McpToolDef, Metric, Migration, ScheduledTask,
6};
7use convergio_types::manifest::{Capability, Manifest, ModuleKind};
8
9/// The Extension entry point for data retention, backup & disaster recovery.
10pub struct BackupExtension {
11    pool: ConnPool,
12}
13
14impl BackupExtension {
15    pub fn new(pool: ConnPool) -> Self {
16        Self { pool }
17    }
18
19    pub fn pool(&self) -> &ConnPool {
20        &self.pool
21    }
22}
23
24impl Default for BackupExtension {
25    fn default() -> Self {
26        let pool = convergio_db::pool::create_memory_pool().expect("in-memory pool for default");
27        Self { pool }
28    }
29}
30
31impl Extension for BackupExtension {
32    fn manifest(&self) -> Manifest {
33        Manifest {
34            id: "convergio-backup".to_string(),
35            description: "Data retention, backup, disaster recovery, \
36                          and org data export/import"
37                .to_string(),
38            version: env!("CARGO_PKG_VERSION").to_string(),
39            kind: ModuleKind::Core,
40            provides: vec![
41                Capability {
42                    name: "data-retention".to_string(),
43                    version: "1.0".to_string(),
44                    description: "Auto-purge expired data with configurable policies".to_string(),
45                },
46                Capability {
47                    name: "db-backup".to_string(),
48                    version: "1.0".to_string(),
49                    description: "Periodic SQLite snapshots with WAL checkpoint".to_string(),
50                },
51                Capability {
52                    name: "disaster-recovery".to_string(),
53                    version: "1.0".to_string(),
54                    description: "Restore daemon to a known state from snapshot".to_string(),
55                },
56                Capability {
57                    name: "org-export-import".to_string(),
58                    version: "1.0".to_string(),
59                    description: "Export/import org data for cross-node migration".to_string(),
60                },
61            ],
62            requires: vec![],
63            agent_tools: vec![],
64            required_roles: vec!["orchestrator".into(), "all".into()],
65        }
66    }
67
68    fn routes(&self, _ctx: &AppContext) -> Option<axum::Router> {
69        let data_dir = convergio_types::platform_paths::convergio_data_dir();
70        let state = std::sync::Arc::new(crate::routes::BackupState {
71            pool: self.pool.clone(),
72            db_path: data_dir.join("convergio.db"),
73            backup_dir: crate::snapshot::backup_dir(&data_dir),
74            node_name: std::env::var("CONVERGIO_NODE_NAME").unwrap_or_else(|_| "local".into()),
75        });
76        Some(crate::routes::router(state))
77    }
78
79    fn migrations(&self) -> Vec<Migration> {
80        crate::schema::migrations()
81    }
82
83    fn on_start(&self, _ctx: &AppContext) -> ExtResult<()> {
84        tracing::info!("backup: extension started");
85        Ok(())
86    }
87
88    fn health(&self) -> Health {
89        match self.pool.get() {
90            Ok(conn) => {
91                let ok = conn
92                    .query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
93                        r.get::<_, i64>(0)
94                    })
95                    .is_ok();
96                if ok {
97                    Health::Ok
98                } else {
99                    Health::Degraded {
100                        reason: "backup_snapshots table inaccessible".into(),
101                    }
102                }
103            }
104            Err(e) => Health::Down {
105                reason: format!("pool error: {e}"),
106            },
107        }
108    }
109
110    fn metrics(&self) -> Vec<Metric> {
111        let conn = match self.pool.get() {
112            Ok(c) => c,
113            Err(_) => return vec![],
114        };
115        let mut metrics = Vec::new();
116        if let Ok(n) = conn.query_row("SELECT COUNT(*) FROM backup_snapshots", [], |r| {
117            r.get::<_, f64>(0)
118        }) {
119            metrics.push(Metric {
120                name: "backup.snapshots.total".into(),
121                value: n,
122                labels: vec![],
123            });
124        }
125        if let Ok(n) = conn.query_row(
126            "SELECT COALESCE(SUM(rows_deleted), 0) FROM backup_purge_log",
127            [],
128            |r| r.get::<_, f64>(0),
129        ) {
130            metrics.push(Metric {
131                name: "backup.purge.total_rows_deleted".into(),
132                value: n,
133                labels: vec![],
134            });
135        }
136        metrics
137    }
138
139    fn scheduled_tasks(&self) -> Vec<ScheduledTask> {
140        vec![
141            ScheduledTask {
142                name: "auto-purge",
143                cron: "0 3 * * *", // daily at 3 AM
144            },
145            ScheduledTask {
146                name: "auto-snapshot",
147                cron: "0 4 * * *", // daily at 4 AM
148            },
149        ]
150    }
151
152    fn mcp_tools(&self) -> Vec<McpToolDef> {
153        crate::mcp_defs::backup_tools()
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160
161    #[test]
162    fn manifest_has_correct_id() {
163        let ext = BackupExtension::default();
164        let m = ext.manifest();
165        assert_eq!(m.id, "convergio-backup");
166        assert_eq!(m.provides.len(), 4);
167    }
168
169    #[test]
170    fn migrations_are_returned() {
171        let ext = BackupExtension::default();
172        let migs = ext.migrations();
173        assert_eq!(migs.len(), 1);
174    }
175
176    #[test]
177    fn health_ok_with_memory_pool() {
178        let pool = convergio_db::pool::create_memory_pool().unwrap();
179        let conn = pool.get().unwrap();
180        for m in crate::schema::migrations() {
181            conn.execute_batch(m.up).unwrap();
182        }
183        drop(conn);
184        let ext = BackupExtension::new(pool);
185        assert!(matches!(ext.health(), Health::Ok));
186    }
187
188    #[test]
189    fn scheduled_tasks_declared() {
190        let ext = BackupExtension::default();
191        let tasks = ext.scheduled_tasks();
192        assert_eq!(tasks.len(), 2);
193        assert_eq!(tasks[0].name, "auto-purge");
194        assert_eq!(tasks[1].name, "auto-snapshot");
195    }
196
197    #[test]
198    fn metrics_with_empty_db() {
199        let pool = convergio_db::pool::create_memory_pool().unwrap();
200        let conn = pool.get().unwrap();
201        for m in crate::schema::migrations() {
202            conn.execute_batch(m.up).unwrap();
203        }
204        drop(conn);
205        let ext = BackupExtension::new(pool);
206        let m = ext.metrics();
207        assert_eq!(m.len(), 2);
208        assert_eq!(m[0].value, 0.0);
209    }
210}