1use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use serde_json::Value;
9use std::path::Path;
10use tracing::{info, warn};
11
12#[derive(Debug, Clone, serde::Serialize)]
14pub struct ImportResult {
15 pub org_id: String,
16 pub tables_imported: Vec<String>,
17 pub rows_inserted: Vec<(String, i64)>,
18 pub rows_skipped: Vec<(String, i64)>,
19}
20
21pub fn import_org_data(pool: &ConnPool, bundle_path: &Path) -> BackupResult<ImportResult> {
23 let content = std::fs::read_to_string(bundle_path)?;
24 let bundle: Value = serde_json::from_str(&content)?;
25
26 let meta: OrgExportMeta = serde_json::from_value(bundle["meta"].clone())?;
27
28 let data = bundle["data"]
29 .as_object()
30 .unwrap_or(&serde_json::Map::new())
31 .clone();
32
33 let conn = pool.get()?;
34 let mut tables_imported = Vec::new();
35 let mut rows_inserted = Vec::new();
36 let mut rows_skipped = Vec::new();
37
38 for (table, rows_val) in &data {
39 let rows = match rows_val.as_array() {
40 Some(arr) => arr,
41 None => continue,
42 };
43
44 let exists: bool = conn
46 .query_row(
47 "SELECT COUNT(*) FROM sqlite_master \
48 WHERE type='table' AND name=?1",
49 rusqlite::params![table],
50 |r| r.get::<_, i64>(0),
51 )
52 .map(|c| c > 0)?;
53
54 if !exists {
55 warn!(table = %table, "table does not exist, skipping import");
56 continue;
57 }
58
59 let (inserted, skipped) = import_table_rows(&conn, table, rows)?;
60 tables_imported.push(table.clone());
61 rows_inserted.push((table.clone(), inserted));
62 rows_skipped.push((table.clone(), skipped));
63 }
64
65 info!(
66 org = %meta.org_id,
67 tables = tables_imported.len(),
68 "org data imported"
69 );
70
71 Ok(ImportResult {
72 org_id: meta.org_id,
73 tables_imported,
74 rows_inserted,
75 rows_skipped,
76 })
77}
78
79fn import_table_rows(
81 conn: &rusqlite::Connection,
82 table: &str,
83 rows: &[Value],
84) -> BackupResult<(i64, i64)> {
85 let mut inserted = 0i64;
86 let mut skipped = 0i64;
87
88 for row in rows {
89 let obj = match row.as_object() {
90 Some(o) => o,
91 None => continue,
92 };
93
94 let columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
95 let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("?{i}")).collect();
96
97 let sql = format!(
98 "INSERT OR IGNORE INTO {} ({}) VALUES ({})",
99 table,
100 columns.join(", "),
101 placeholders.join(", "),
102 );
103
104 let values: Vec<Box<dyn rusqlite::ToSql>> = obj.values().map(json_to_tosql).collect();
105
106 let refs: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
107
108 let changed = conn.execute(&sql, refs.as_slice())?;
109 if changed > 0 {
110 inserted += 1;
111 } else {
112 skipped += 1;
113 }
114 }
115
116 Ok((inserted, skipped))
117}
118
119fn json_to_tosql(val: &Value) -> Box<dyn rusqlite::ToSql> {
121 match val {
122 Value::String(s) => Box::new(s.clone()),
123 Value::Number(n) => {
124 if let Some(i) = n.as_i64() {
125 Box::new(i)
126 } else if let Some(f) = n.as_f64() {
127 Box::new(f)
128 } else {
129 Box::new(n.to_string())
130 }
131 }
132 Value::Bool(b) => Box::new(*b as i32),
133 Value::Null => Box::new(rusqlite::types::Null),
134 other => Box::new(other.to_string()),
135 }
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141
142 fn setup_pool() -> ConnPool {
143 let pool = convergio_db::pool::create_memory_pool().unwrap();
144 let conn = pool.get().unwrap();
145 for m in crate::schema::migrations() {
146 conn.execute_batch(m.up).unwrap();
147 }
148 conn.execute_batch(
149 "CREATE TABLE IF NOT EXISTS ipc_agents (
150 id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
151 created_at TEXT DEFAULT (datetime('now'))
152 )",
153 )
154 .unwrap();
155 drop(conn);
156 pool
157 }
158
159 fn write_bundle(dir: &Path, org_id: &str, data: Value) -> std::path::PathBuf {
160 let bundle = serde_json::json!({
161 "meta": {
162 "org_id": org_id,
163 "org_name": "Test Corp",
164 "exported_at": "2026-04-03T00:00:00Z",
165 "node": "test-node",
166 "tables": ["ipc_agents"],
167 "row_counts": [["ipc_agents", 1]],
168 "version": "0.1.0"
169 },
170 "data": data,
171 });
172 let path = dir.join("import-test.json");
173 std::fs::write(&path, serde_json::to_string_pretty(&bundle).unwrap()).unwrap();
174 path
175 }
176
177 #[test]
178 fn import_inserts_rows() {
179 let pool = setup_pool();
180 let tmp = tempfile::tempdir().unwrap();
181 let data = serde_json::json!({
182 "ipc_agents": [
183 {"id": "a1", "name": "Elena", "org_id": "org-legal"}
184 ]
185 });
186 let path = write_bundle(tmp.path(), "org-legal", data);
187
188 let result = import_org_data(&pool, &path).unwrap();
189 assert_eq!(result.rows_inserted[0].1, 1);
190
191 let conn = pool.get().unwrap();
192 let count: i64 = conn
193 .query_row("SELECT COUNT(*) FROM ipc_agents", [], |r| r.get(0))
194 .unwrap();
195 assert_eq!(count, 1);
196 }
197
198 #[test]
199 fn import_skips_duplicates() {
200 let pool = setup_pool();
201 let conn = pool.get().unwrap();
202 conn.execute(
203 "INSERT INTO ipc_agents VALUES ('a1', 'Existing', 'org-legal', \
204 datetime('now'))",
205 [],
206 )
207 .unwrap();
208 drop(conn);
209
210 let tmp = tempfile::tempdir().unwrap();
211 let data = serde_json::json!({
212 "ipc_agents": [
213 {"id": "a1", "name": "Elena", "org_id": "org-legal"}
214 ]
215 });
216 let path = write_bundle(tmp.path(), "org-legal", data);
217
218 let result = import_org_data(&pool, &path).unwrap();
219 assert_eq!(result.rows_skipped[0].1, 1);
220 assert_eq!(result.rows_inserted[0].1, 0);
221 }
222
223 #[test]
224 fn import_ignores_missing_tables() {
225 let pool = setup_pool();
226 let tmp = tempfile::tempdir().unwrap();
227 let data = serde_json::json!({
228 "nonexistent_table": [{"id": "x"}]
229 });
230 let path = write_bundle(tmp.path(), "org-test", data);
231
232 let result = import_org_data(&pool, &path).unwrap();
233 assert!(result.tables_imported.is_empty());
234 }
235}