Skip to main content

convergio_backup/
export.rs

1//! Org data export — package all org data into a portable JSON bundle.
2//!
3//! Exports tasks, prompts, agents, billing records, and audit trail
4//! for a single org. The bundle can be imported on another node.
5
6use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use serde_json::{json, Value};
10use std::path::Path;
11use tracing::info;
12
13/// Tables that store org-scoped data (column = org_id).
14const ORG_TABLES: &[(&str, &str)] = &[
15    ("ipc_agents", "org_id"),
16    ("ipc_messages", "org_id"),
17    ("ipc_budget_log", "org_id"),
18];
19
20/// Export all data for an org into a JSON file.
21///
22/// Scans known org-scoped tables, extracts rows matching the org_id,
23/// and writes a self-describing JSON bundle to `dest_path`.
24pub fn export_org_data(
25    pool: &ConnPool,
26    org_id: &str,
27    org_name: &str,
28    node: &str,
29    dest_path: &Path,
30) -> BackupResult<OrgExportMeta> {
31    // Path safety: dest_path is system-constructed (backup_dir + sanitised org_id).
32    // User-supplied paths are validated at the HTTP boundary in routes.rs.
33    let conn = pool.get()?;
34    let mut tables_exported = Vec::new();
35    let mut row_counts = Vec::new();
36    let mut data = serde_json::Map::new();
37
38    for &(table, col) in ORG_TABLES {
39        // Check if table exists
40        let exists: bool = conn
41            .query_row(
42                "SELECT COUNT(*) FROM sqlite_master \
43                 WHERE type='table' AND name=?1",
44                params![table],
45                |r| r.get::<_, i64>(0),
46            )
47            .map(|c| c > 0)?;
48
49        if !exists {
50            continue;
51        }
52
53        let rows = export_table_rows(&conn, table, col, org_id)?;
54        let count = rows.len() as i64;
55        if count > 0 {
56            tables_exported.push(table.to_string());
57            row_counts.push((table.to_string(), count));
58            data.insert(table.to_string(), Value::Array(rows));
59        }
60    }
61
62    let meta = OrgExportMeta {
63        org_id: org_id.to_string(),
64        org_name: org_name.to_string(),
65        exported_at: chrono::Utc::now().to_rfc3339(),
66        node: node.to_string(),
67        tables: tables_exported,
68        row_counts,
69        version: env!("CARGO_PKG_VERSION").to_string(),
70    };
71
72    let bundle = json!({
73        "meta": meta,
74        "data": data,
75    });
76
77    if let Some(parent) = dest_path.parent() {
78        std::fs::create_dir_all(parent)?;
79    }
80    let json_str = serde_json::to_string_pretty(&bundle)?;
81    std::fs::write(dest_path, json_str)?;
82
83    info!(
84        org = %org_id,
85        tables = meta.tables.len(),
86        path = %dest_path.display(),
87        "org data exported"
88    );
89    Ok(meta)
90}
91
92/// Extract all rows for an org from a single table as JSON values.
93fn export_table_rows(
94    conn: &rusqlite::Connection,
95    table: &str,
96    org_col: &str,
97    org_id: &str,
98) -> BackupResult<Vec<Value>> {
99    // Validate identifiers to prevent SQL injection
100    crate::types::validate_sql_identifier(table)?;
101    crate::types::validate_sql_identifier(org_col)?;
102
103    let sql = format!("SELECT * FROM {table} WHERE {org_col} = ?1");
104    let mut stmt = conn.prepare(&sql)?;
105    let col_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
106
107    let rows = stmt.query_map(params![org_id], |row| {
108        let mut obj = serde_json::Map::new();
109        for (i, name) in col_names.iter().enumerate() {
110            let val = row_value_at(row, i);
111            obj.insert(name.clone(), val);
112        }
113        Ok(Value::Object(obj))
114    })?;
115
116    let mut result = Vec::new();
117    for v in rows.flatten() {
118        result.push(v);
119    }
120    Ok(result)
121}
122
123/// Extract a value from a rusqlite Row at a given index.
124fn row_value_at(row: &rusqlite::Row<'_>, idx: usize) -> Value {
125    if let Ok(v) = row.get::<_, String>(idx) {
126        return Value::String(v);
127    }
128    if let Ok(v) = row.get::<_, i64>(idx) {
129        return Value::Number(v.into());
130    }
131    if let Ok(v) = row.get::<_, f64>(idx) {
132        return serde_json::Number::from_f64(v)
133            .map(Value::Number)
134            .unwrap_or(Value::Null);
135    }
136    Value::Null
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    fn setup_pool() -> ConnPool {
144        let pool = convergio_db::pool::create_memory_pool().unwrap();
145        let conn = pool.get().unwrap();
146        for m in crate::schema::migrations() {
147            conn.execute_batch(m.up).unwrap();
148        }
149        // Create a mock org-scoped table
150        conn.execute_batch(
151            "CREATE TABLE IF NOT EXISTS ipc_agents (
152                id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
153                created_at TEXT DEFAULT (datetime('now'))
154            )",
155        )
156        .unwrap();
157        conn.execute(
158            "INSERT INTO ipc_agents VALUES ('a1', 'Elena', 'org-legal', \
159             datetime('now'))",
160            [],
161        )
162        .unwrap();
163        conn.execute(
164            "INSERT INTO ipc_agents VALUES ('a2', 'Baccio', 'org-dev', \
165             datetime('now'))",
166            [],
167        )
168        .unwrap();
169        drop(conn);
170        pool
171    }
172
173    #[test]
174    fn export_org_creates_bundle_file() {
175        let pool = setup_pool();
176        let tmp = tempfile::tempdir().unwrap();
177        let dest = tmp.path().join("export.json");
178        let meta = export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
179        assert!(dest.exists());
180        assert_eq!(meta.org_id, "org-legal");
181        assert_eq!(meta.tables, vec!["ipc_agents"]);
182        assert_eq!(meta.row_counts[0].1, 1);
183    }
184
185    #[test]
186    fn export_empty_org_produces_empty_bundle() {
187        let pool = setup_pool();
188        let tmp = tempfile::tempdir().unwrap();
189        let dest = tmp.path().join("export.json");
190        let meta = export_org_data(&pool, "org-empty", "Empty Corp", "test-node", &dest).unwrap();
191        assert!(meta.tables.is_empty());
192    }
193
194    #[test]
195    fn export_bundle_is_valid_json() {
196        let pool = setup_pool();
197        let tmp = tempfile::tempdir().unwrap();
198        let dest = tmp.path().join("export.json");
199        export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
200        let content = std::fs::read_to_string(&dest).unwrap();
201        let parsed: Value = serde_json::from_str(&content).unwrap();
202        assert!(parsed["meta"]["org_id"].is_string());
203        assert!(parsed["data"]["ipc_agents"].is_array());
204    }
205}