convergio_backup/
export.rs1use crate::types::{BackupError, BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use rusqlite::params;
9use serde_json::{json, Value};
10use std::path::Path;
11use tracing::info;
12
13const ORG_TABLES: &[(&str, &str)] = &[
15 ("ipc_agents", "org_id"),
16 ("ipc_messages", "org_id"),
17 ("ipc_budget_log", "org_id"),
18];
19
20pub fn export_org_data(
25 pool: &ConnPool,
26 org_id: &str,
27 org_name: &str,
28 node: &str,
29 dest_path: &Path,
30) -> BackupResult<OrgExportMeta> {
31 convergio_types::platform_paths::validate_path_components(dest_path)
33 .map_err(BackupError::RestoreFailed)?;
34 let conn = pool.get()?;
35 let mut tables_exported = Vec::new();
36 let mut row_counts = Vec::new();
37 let mut data = serde_json::Map::new();
38
39 for &(table, col) in ORG_TABLES {
40 let exists: bool = conn
42 .query_row(
43 "SELECT COUNT(*) FROM sqlite_master \
44 WHERE type='table' AND name=?1",
45 params![table],
46 |r| r.get::<_, i64>(0),
47 )
48 .map(|c| c > 0)?;
49
50 if !exists {
51 continue;
52 }
53
54 let rows = export_table_rows(&conn, table, col, org_id)?;
55 let count = rows.len() as i64;
56 if count > 0 {
57 tables_exported.push(table.to_string());
58 row_counts.push((table.to_string(), count));
59 data.insert(table.to_string(), Value::Array(rows));
60 }
61 }
62
63 let meta = OrgExportMeta {
64 org_id: org_id.to_string(),
65 org_name: org_name.to_string(),
66 exported_at: chrono::Utc::now().to_rfc3339(),
67 node: node.to_string(),
68 tables: tables_exported,
69 row_counts,
70 version: env!("CARGO_PKG_VERSION").to_string(),
71 };
72
73 let bundle = json!({
74 "meta": meta,
75 "data": data,
76 });
77
78 if let Some(parent) = dest_path.parent() {
79 std::fs::create_dir_all(parent)?;
80 }
81 let json_str = serde_json::to_string_pretty(&bundle)?;
82 std::fs::write(dest_path, json_str)?;
83
84 info!(
85 org = %org_id,
86 tables = meta.tables.len(),
87 path = %dest_path.display(),
88 "org data exported"
89 );
90 Ok(meta)
91}
92
93fn export_table_rows(
95 conn: &rusqlite::Connection,
96 table: &str,
97 org_col: &str,
98 org_id: &str,
99) -> BackupResult<Vec<Value>> {
100 let sql = format!("SELECT * FROM {table} WHERE {org_col} = ?1");
101 let mut stmt = conn.prepare(&sql)?;
102 let col_names: Vec<String> = stmt.column_names().iter().map(|s| s.to_string()).collect();
103
104 let rows = stmt.query_map(params![org_id], |row| {
105 let mut obj = serde_json::Map::new();
106 for (i, name) in col_names.iter().enumerate() {
107 let val = row_value_at(row, i);
108 obj.insert(name.clone(), val);
109 }
110 Ok(Value::Object(obj))
111 })?;
112
113 let mut result = Vec::new();
114 for v in rows.flatten() {
115 result.push(v);
116 }
117 Ok(result)
118}
119
120fn row_value_at(row: &rusqlite::Row<'_>, idx: usize) -> Value {
122 if let Ok(v) = row.get::<_, String>(idx) {
123 return Value::String(v);
124 }
125 if let Ok(v) = row.get::<_, i64>(idx) {
126 return Value::Number(v.into());
127 }
128 if let Ok(v) = row.get::<_, f64>(idx) {
129 return serde_json::Number::from_f64(v)
130 .map(Value::Number)
131 .unwrap_or(Value::Null);
132 }
133 Value::Null
134}
135
136#[cfg(test)]
137mod tests {
138 use super::*;
139
140 fn setup_pool() -> ConnPool {
141 let pool = convergio_db::pool::create_memory_pool().unwrap();
142 let conn = pool.get().unwrap();
143 for m in crate::schema::migrations() {
144 conn.execute_batch(m.up).unwrap();
145 }
146 conn.execute_batch(
148 "CREATE TABLE IF NOT EXISTS ipc_agents (
149 id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
150 created_at TEXT DEFAULT (datetime('now'))
151 )",
152 )
153 .unwrap();
154 conn.execute(
155 "INSERT INTO ipc_agents VALUES ('a1', 'Elena', 'org-legal', \
156 datetime('now'))",
157 [],
158 )
159 .unwrap();
160 conn.execute(
161 "INSERT INTO ipc_agents VALUES ('a2', 'Baccio', 'org-dev', \
162 datetime('now'))",
163 [],
164 )
165 .unwrap();
166 drop(conn);
167 pool
168 }
169
170 #[test]
171 fn export_org_creates_bundle_file() {
172 let pool = setup_pool();
173 let tmp = tempfile::tempdir().unwrap();
174 let dest = tmp.path().join("export.json");
175 let meta = export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
176 assert!(dest.exists());
177 assert_eq!(meta.org_id, "org-legal");
178 assert_eq!(meta.tables, vec!["ipc_agents"]);
179 assert_eq!(meta.row_counts[0].1, 1);
180 }
181
182 #[test]
183 fn export_empty_org_produces_empty_bundle() {
184 let pool = setup_pool();
185 let tmp = tempfile::tempdir().unwrap();
186 let dest = tmp.path().join("export.json");
187 let meta = export_org_data(&pool, "org-empty", "Empty Corp", "test-node", &dest).unwrap();
188 assert!(meta.tables.is_empty());
189 }
190
191 #[test]
192 fn export_bundle_is_valid_json() {
193 let pool = setup_pool();
194 let tmp = tempfile::tempdir().unwrap();
195 let dest = tmp.path().join("export.json");
196 export_org_data(&pool, "org-legal", "Legal Corp", "test-node", &dest).unwrap();
197 let content = std::fs::read_to_string(&dest).unwrap();
198 let parsed: Value = serde_json::from_str(&content).unwrap();
199 assert!(parsed["meta"]["org_id"].is_string());
200 assert!(parsed["data"]["ipc_agents"].is_array());
201 }
202}