Skip to main content

convergio_backup/
export.rs

1//! Org data export — package all org data into a portable JSON bundle.
2//!
3//! Exports tasks, prompts, agents, billing records, and audit trail
4//! for a single org. The bundle can be imported on another node.
5
6use crate::types::{BackupError, BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use serde_json::{json, Value};
10use std::path::Path;
11use tracing::info;
12
13/// Tables that store org-scoped data (column = org_id).
14const ORG_TABLES: &[(&str, &str)] = &[
15    ("ipc_agents", "org_id"),
16    ("ipc_messages", "org_id"),
17    ("ipc_budget_log", "org_id"),
18];
19
20/// Export all data for an org into a JSON file.
21///
22/// Scans known org-scoped tables, extracts rows matching the org_id,
23/// and writes a self-describing JSON bundle to `dest_path`.
24pub fn export_org_data(
25    pool: &ConnPool,
26    org_id: &str,
27    org_name: &str,
28    node: &str,
29    dest_path: &Path,
30) -> BackupResult<OrgExportMeta> {
31    // Validate destination path has no traversal
32    convergio_types::platform_paths::validate_path_components(dest_path)
33        .map_err(BackupError::RestoreFailed)?;
34    let conn = pool.get()?;
35    let mut tables_exported = Vec::new();
36    let mut row_counts = Vec::new();
37    let mut data = serde_json::Map::new();
38
39    for &(table, col) in ORG_TABLES {
40        // Check if table exists
41        let exists: bool = conn
42            .query_row(
43                "SELECT COUNT(*) FROM sqlite_master \
44                 WHERE type='table' AND name=?1",
45                params![table],
46                |r| r.get::<_, i64>(0),
47            )
48            .map(|c| c > 0)?;
49
50        if !exists {
51            continue;
52        }
53
54        let rows = export_table_rows(&conn, table, col, org_id)?;
55        let count = rows.len() as i64;
56        if count > 0 {
57            tables_exported.push(table.to_string());
58            row_counts.push((table.to_string(), count));
59            data.insert(table.to_string(), Value::Array(rows));
60        }
61    }
62
63    let meta = OrgExportMeta {
64        org_id: org_id.to_string(),
65        org_name: org_name.to_string(),
66        exported_at: chrono::Utc::now().to_rfc3339(),
67        node: node.to_string(),
68        tables: tables_exported,
69        row_counts,
70        version: env!("CARGO_PKG_VERSION").to_string(),
71    };
72
73    let bundle = json!({
74        "meta": meta,
75        "data": data,
76    });
77
78    if let Some(parent) = dest_path.parent() {
79        std::fs::create_dir_all(parent)?;
80    }
81    let json_str = serde_json::to_string_pretty(&bundle)?;
82    std::fs::write(dest_path, json_str)?;
83
84    info!(
85        org = %org_id,
86        tables = meta.tables.len(),
87        path = %dest_path.display(),
88        "org data exported"
89    );
90    Ok(meta)
91}
92
93/// Extract all rows for an org from a single table as JSON values.
94fn export_table_rows(
95    conn: &rusqlite::Connection,
96    table: &str,
97    org_col: &str,
98    org_id: &str,
99) -> BackupResult<Vec<Value>> {
100    let sql = format!("SELECT * FROM {table} WHERE {org_col} = ?1");
101    let mut stmt = conn.prepare(&sql)?;
102    let col_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
103
104    let rows = stmt.query_map(params![org_id], |row| {
105        let mut obj = serde_json::Map::new();
106        for (i, name) in col_names.iter().enumerate() {
107            let val = row_value_at(row, i);
108            obj.insert(name.clone(), val);
109        }
110        Ok(Value::Object(obj))
111    })?;
112
113    let mut result = Vec::new();
114    for v in rows.flatten() {
115        result.push(v);
116    }
117    Ok(result)
118}
119
120/// Extract a value from a rusqlite Row at a given index.
121fn row_value_at(row: &rusqlite::Row<'_>, idx: usize) -> Value {
122    if let Ok(v) = row.get::<_, String>(idx) {
123        return Value::String(v);
124    }
125    if let Ok(v) = row.get::<_, i64>(idx) {
126        return Value::Number(v.into());
127    }
128    if let Ok(v) = row.get::<_, f64>(idx) {
129        return serde_json::Number::from_f64(v)
130            .map(Value::Number)
131            .unwrap_or(Value::Null);
132    }
133    Value::Null
134}
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    fn setup_pool() -> ConnPool {
141        let pool = convergio_db::pool::create_memory_pool().unwrap();
142        let conn = pool.get().unwrap();
143        for m in crate::schema::migrations() {
144            conn.execute_batch(m.up).unwrap();
145        }
146        // Create a mock org-scoped table
147        conn.execute_batch(
148            "CREATE TABLE IF NOT EXISTS ipc_agents (
149                id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
150                created_at TEXT DEFAULT (datetime('now'))
151            )",
152        )
153        .unwrap();
154        conn.execute(
155            "INSERT INTO ipc_agents VALUES ('a1', 'Elena', 'org-legal', \
156             datetime('now'))",
157            [],
158        )
159        .unwrap();
160        conn.execute(
161            "INSERT INTO ipc_agents VALUES ('a2', 'Baccio', 'org-dev', \
162             datetime('now'))",
163            [],
164        )
165        .unwrap();
166        drop(conn);
167        pool
168    }
169
170    #[test]
171    fn export_org_creates_bundle_file() {
172        let pool = setup_pool();
173        let tmp = tempfile::tempdir().unwrap();
174        let dest = tmp.path().join("export.json");
175        let meta = export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
176        assert!(dest.exists());
177        assert_eq!(meta.org_id, "org-legal");
178        assert_eq!(meta.tables, vec!["ipc_agents"]);
179        assert_eq!(meta.row_counts[0].1, 1);
180    }
181
182    #[test]
183    fn export_empty_org_produces_empty_bundle() {
184        let pool = setup_pool();
185        let tmp = tempfile::tempdir().unwrap();
186        let dest = tmp.path().join("export.json");
187        let meta = export_org_data(&pool, "org-empty", "Empty Corp", "test-node", &dest).unwrap();
188        assert!(meta.tables.is_empty());
189    }
190
191    #[test]
192    fn export_bundle_is_valid_json() {
193        let pool = setup_pool();
194        let tmp = tempfile::tempdir().unwrap();
195        let dest = tmp.path().join("export.json");
196        export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
197        let content = std::fs::read_to_string(&dest).unwrap();
198        let parsed: Value = serde_json::from_str(&content).unwrap();
199        assert!(parsed["meta"]["org_id"].is_string());
200        assert!(parsed["data"]["ipc_agents"].is_array());
201    }
202}