convergio_backup/
export.rs1use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use serde_json::{json, Value};
10use std::path::Path;
11use tracing::info;
12
13const ORG_TABLES: &[(&str, &str)] = &[
15 ("ipc_agents", "org_id"),
16 ("ipc_messages", "org_id"),
17 ("ipc_budget_log", "org_id"),
18];
19
20pub fn export_org_data(
25 pool: &ConnPool,
26 org_id: &str,
27 org_name: &str,
28 node: &str,
29 dest_path: &Path,
30) -> BackupResult<OrgExportMeta> {
31 let conn = pool.get()?;
34 let mut tables_exported = Vec::new();
35 let mut row_counts = Vec::new();
36 let mut data = serde_json::Map::new();
37
38 for &(table, col) in ORG_TABLES {
39 let exists: bool = conn
41 .query_row(
42 "SELECT COUNT(*) FROM sqlite_master \
43 WHERE type='table' AND name=?1",
44 params![table],
45 |r| r.get::<_, i64>(0),
46 )
47 .map(|c| c > 0)?;
48
49 if !exists {
50 continue;
51 }
52
53 let rows = export_table_rows(&conn, table, col, org_id)?;
54 let count = rows.len() as i64;
55 if count > 0 {
56 tables_exported.push(table.to_string());
57 row_counts.push((table.to_string(), count));
58 data.insert(table.to_string(), Value::Array(rows));
59 }
60 }
61
62 let meta = OrgExportMeta {
63 org_id: org_id.to_string(),
64 org_name: org_name.to_string(),
65 exported_at: chrono::Utc::now().to_rfc3339(),
66 node: node.to_string(),
67 tables: tables_exported,
68 row_counts,
69 version: env!("CARGO_PKG_VERSION").to_string(),
70 };
71
72 let bundle = json!({
73 "meta": meta,
74 "data": data,
75 });
76
77 if let Some(parent) = dest_path.parent() {
78 std::fs::create_dir_all(parent)?;
79 }
80 let json_str = serde_json::to_string_pretty(&bundle)?;
81 std::fs::write(dest_path, json_str)?;
82
83 info!(
84 org = %org_id,
85 tables = meta.tables.len(),
86 path = %dest_path.display(),
87 "org data exported"
88 );
89 Ok(meta)
90}
91
92fn export_table_rows(
94 conn: &rusqlite::Connection,
95 table: &str,
96 org_col: &str,
97 org_id: &str,
98) -> BackupResult<Vec<Value>> {
99 crate::types::validate_sql_identifier(table)?;
101 crate::types::validate_sql_identifier(org_col)?;
102
103 let sql = format!("SELECT * FROM {table} WHERE {org_col} = ?1");
104 let mut stmt = conn.prepare(&sql)?;
105 let col_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
106
107 let rows = stmt.query_map(params![org_id], |row| {
108 let mut obj = serde_json::Map::new();
109 for (i, name) in col_names.iter().enumerate() {
110 let val = row_value_at(row, i);
111 obj.insert(name.clone(), val);
112 }
113 Ok(Value::Object(obj))
114 })?;
115
116 let mut result = Vec::new();
117 for v in rows.flatten() {
118 result.push(v);
119 }
120 Ok(result)
121}
122
123fn row_value_at(row: &rusqlite::Row<'_>, idx: usize) -> Value {
125 if let Ok(v) = row.get::<_, String>(idx) {
126 return Value::String(v);
127 }
128 if let Ok(v) = row.get::<_, i64>(idx) {
129 return Value::Number(v.into());
130 }
131 if let Ok(v) = row.get::<_, f64>(idx) {
132 return serde_json::Number::from_f64(v)
133 .map(Value::Number)
134 .unwrap_or(Value::Null);
135 }
136 Value::Null
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 fn setup_pool() -> ConnPool {
144 let pool = convergio_db::pool::create_memory_pool().unwrap();
145 let conn = pool.get().unwrap();
146 for m in crate::schema::migrations() {
147 conn.execute_batch(m.up).unwrap();
148 }
149 conn.execute_batch(
151 "CREATE TABLE IF NOT EXISTS ipc_agents (
152 id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
153 created_at TEXT DEFAULT (datetime('now'))
154 )",
155 )
156 .unwrap();
157 conn.execute(
158 "INSERT INTO ipc_agents VALUES ('a1', 'Elena', 'org-legal', \
159 datetime('now'))",
160 [],
161 )
162 .unwrap();
163 conn.execute(
164 "INSERT INTO ipc_agents VALUES ('a2', 'Baccio', 'org-dev', \
165 datetime('now'))",
166 [],
167 )
168 .unwrap();
169 drop(conn);
170 pool
171 }
172
173 #[test]
174 fn export_org_creates_bundle_file() {
175 let pool = setup_pool();
176 let tmp = tempfile::tempdir().unwrap();
177 let dest = tmp.path().join("export.json");
178 let meta = export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
179 assert!(dest.exists());
180 assert_eq!(meta.org_id, "org-legal");
181 assert_eq!(meta.tables, vec!["ipc_agents"]);
182 assert_eq!(meta.row_counts[0].1, 1);
183 }
184
185 #[test]
186 fn export_empty_org_produces_empty_bundle() {
187 let pool = setup_pool();
188 let tmp = tempfile::tempdir().unwrap();
189 let dest = tmp.path().join("export.json");
190 let meta = export_org_data(&pool, "org-empty", "Empty Corp", "test-node", &dest).unwrap();
191 assert!(meta.tables.is_empty());
192 }
193
194 #[test]
195 fn export_bundle_is_valid_json() {
196 let pool = setup_pool();
197 let tmp = tempfile::tempdir().unwrap();
198 let dest = tmp.path().join("export.json");
199 export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
200 let content = std::fs::read_to_string(&dest).unwrap();
201 let parsed: Value = serde_json::from_str(&content).unwrap();
202 assert!(parsed["meta"]["org_id"].is_string());
203 assert!(parsed["data"]["ipc_agents"].is_array());
204 }
205}