1use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use serde_json::Value;
9use std::path::Path;
10use tracing::{info, warn};
11
12#[derive(Debug, Clone, serde::Serialize)]
14pub struct ImportResult {
15 pub org_id: String,
16 pub tables_imported: Vec<String>,
17 pub rows_inserted: Vec<(String, i64)>,
18 pub rows_skipped: Vec<(String, i64)>,
19}
20
21pub fn import_org_data(pool: &ConnPool, bundle_path: &Path) -> BackupResult<ImportResult> {
23 let content = std::fs::read_to_string(bundle_path)?;
24 let bundle: Value = serde_json::from_str(&content)?;
25
26 let meta: OrgExportMeta = serde_json::from_value(bundle["meta"].clone())?;
27
28 let data = bundle["data"]
29 .as_object()
30 .unwrap_or(&serde_json::Map::new())
31 .clone();
32
33 let conn = pool.get()?;
34 let mut tables_imported = Vec::new();
35 let mut rows_inserted = Vec::new();
36 let mut rows_skipped = Vec::new();
37
38 for (table, rows_val) in &data {
39 let rows = match rows_val.as_array() {
40 Some(arr) => arr,
41 None => continue,
42 };
43
44 let exists: bool = conn
46 .query_row(
47 "SELECT COUNT(*) FROM sqlite_master \
48 WHERE type='table' AND name=?1",
49 rusqlite::params![table],
50 |r| r.get::<_, i64>(0),
51 )
52 .map(|c| c > 0)?;
53
54 if !exists {
55 warn!(table = %table, "table does not exist, skipping import");
56 continue;
57 }
58
59 let (inserted, skipped) = import_table_rows(&conn, table, rows)?;
60 tables_imported.push(table.clone());
61 rows_inserted.push((table.clone(), inserted));
62 rows_skipped.push((table.clone(), skipped));
63 }
64
65 info!(
66 org = %meta.org_id,
67 tables = tables_imported.len(),
68 "org data imported"
69 );
70
71 Ok(ImportResult {
72 org_id: meta.org_id,
73 tables_imported,
74 rows_inserted,
75 rows_skipped,
76 })
77}
78
79fn import_table_rows(
81 conn: &rusqlite::Connection,
82 table: &str,
83 rows: &[Value],
84) -> BackupResult<(i64, i64)> {
85 crate::types::validate_sql_identifier(table)?;
87
88 let mut inserted = 0i64;
89 let mut skipped = 0i64;
90
91 for row in rows {
92 let obj = match row.as_object() {
93 Some(o) => o,
94 None => continue,
95 };
96
97 let columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
98 for col in &columns {
100 crate::types::validate_sql_identifier(col)?;
101 }
102 let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("?{i}")).collect();
103
104 let sql = format!(
105 "INSERT OR IGNORE INTO {} ({}) VALUES ({})",
106 table,
107 columns.join(", "),
108 placeholders.join(", "),
109 );
110
111 let values: Vec<Box<dyn rusqlite::ToSql>> = obj.values().map(json_to_tosql).collect();
112
113 let refs: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
114
115 let changed = conn.execute(&sql, refs.as_slice())?;
116 if changed > 0 {
117 inserted += 1;
118 } else {
119 skipped += 1;
120 }
121 }
122
123 Ok((inserted, skipped))
124}
125
126fn json_to_tosql(val: &Value) -> Box<dyn rusqlite::ToSql> {
128 match val {
129 Value::String(s) => Box::new(s.clone()),
130 Value::Number(n) => {
131 if let Some(i) = n.as_i64() {
132 Box::new(i)
133 } else if let Some(f) = n.as_f64() {
134 Box::new(f)
135 } else {
136 Box::new(n.to_string())
137 }
138 }
139 Value::Bool(b) => Box::new(*b as i32),
140 Value::Null => Box::new(rusqlite::types::Null),
141 other => Box::new(other.to_string()),
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 fn setup_pool() -> ConnPool {
150 let pool = convergio_db::pool::create_memory_pool().unwrap();
151 let conn = pool.get().unwrap();
152 for m in crate::schema::migrations() {
153 conn.execute_batch(m.up).unwrap();
154 }
155 conn.execute_batch(
156 "CREATE TABLE IF NOT EXISTS ipc_agents (
157 id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
158 created_at TEXT DEFAULT (datetime('now'))
159 )",
160 )
161 .unwrap();
162 drop(conn);
163 pool
164 }
165
166 fn write_bundle(dir: &Path, org_id: &str, data: Value) -> std::path::PathBuf {
167 let bundle = serde_json::json!({
168 "meta": {
169 "org_id": org_id,
170 "org_name": "Test Corp",
171 "exported_at": "2026-04-03T00:00:00Z",
172 "node": "test-node",
173 "tables": ["ipc_agents"],
174 "row_counts": [["ipc_agents", 1]],
175 "version": "0.1.0"
176 },
177 "data": data,
178 });
179 let path = dir.join("import-test.json");
180 std::fs::write(&path, serde_json::to_string_pretty(&bundle).unwrap()).unwrap();
181 path
182 }
183
184 #[test]
185 fn import_inserts_rows() {
186 let pool = setup_pool();
187 let tmp = tempfile::tempdir().unwrap();
188 let data = serde_json::json!({
189 "ipc_agents": [
190 {"id": "a1", "name": "Elena", "org_id": "org-legal"}
191 ]
192 });
193 let path = write_bundle(tmp.path(), "org-legal", data);
194
195 let result = import_org_data(&pool, &path).unwrap();
196 assert_eq!(result.rows_inserted[0].1, 1);
197
198 let conn = pool.get().unwrap();
199 let count: i64 = conn
200 .query_row("SELECT COUNT(*) FROM ipc_agents", [], |r| r.get(0))
201 .unwrap();
202 assert_eq!(count, 1);
203 }
204
205 #[test]
206 fn import_skips_duplicates() {
207 let pool = setup_pool();
208 let conn = pool.get().unwrap();
209 conn.execute(
210 "INSERT INTO ipc_agents VALUES ('a1', 'Existing', 'org-legal', \
211 datetime('now'))",
212 [],
213 )
214 .unwrap();
215 drop(conn);
216
217 let tmp = tempfile::tempdir().unwrap();
218 let data = serde_json::json!({
219 "ipc_agents": [
220 {"id": "a1", "name": "Elena", "org_id": "org-legal"}
221 ]
222 });
223 let path = write_bundle(tmp.path(), "org-legal", data);
224
225 let result = import_org_data(&pool, &path).unwrap();
226 assert_eq!(result.rows_skipped[0].1, 1);
227 assert_eq!(result.rows_inserted[0].1, 0);
228 }
229
230 #[test]
231 fn import_ignores_missing_tables() {
232 let pool = setup_pool();
233 let tmp = tempfile::tempdir().unwrap();
234 let data = serde_json::json!({
235 "nonexistent_table": [{"id": "x"}]
236 });
237 let path = write_bundle(tmp.path(), "org-test", data);
238
239 let result = import_org_data(&pool, &path).unwrap();
240 assert!(result.tables_imported.is_empty());
241 }
242}