Skip to main content

convergio_backup/
import.rs

1//! Org data import — load a previously exported JSON bundle.
2//!
3//! Reads the export bundle, validates metadata, and inserts rows
4//! into the appropriate tables. Skips rows that already exist (by PK).
5
6use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use serde_json::Value;
9use std::path::Path;
10use tracing::{info, warn};
11
12/// Import result summary.
13#[derive(Debug, Clone, serde::Serialize)]
14pub struct ImportResult {
15    pub org_id: String,
16    pub tables_imported: Vec<String>,
17    pub rows_inserted: Vec<(String, i64)>,
18    pub rows_skipped: Vec<(String, i64)>,
19}
20
21/// Import org data from an export bundle file.
22pub fn import_org_data(pool: &ConnPool, bundle_path: &Path) -> BackupResult<ImportResult> {
23    let content = std::fs::read_to_string(bundle_path)?;
24    let bundle: Value = serde_json::from_str(&content)?;
25
26    let meta: OrgExportMeta = serde_json::from_value(bundle["meta"].clone())?;
27
28    let data = bundle["data"]
29        .as_object()
30        .unwrap_or(&serde_json::Map::new())
31        .clone();
32
33    let conn = pool.get()?;
34    let mut tables_imported = Vec::new();
35    let mut rows_inserted = Vec::new();
36    let mut rows_skipped = Vec::new();
37
38    for (table, rows_val) in &data {
39        let rows = match rows_val.as_array() {
40            Some(arr) => arr,
41            None => continue,
42        };
43
44        // Check table exists
45        let exists: bool = conn
46            .query_row(
47                "SELECT COUNT(*) FROM sqlite_master \
48                 WHERE type='table' AND name=?1",
49                rusqlite::params![table],
50                |r| r.get::<_, i64>(0),
51            )
52            .map(|c| c > 0)?;
53
54        if !exists {
55            warn!(table = %table, "table does not exist, skipping import");
56            continue;
57        }
58
59        let (inserted, skipped) = import_table_rows(&conn, table, rows)?;
60        tables_imported.push(table.clone());
61        rows_inserted.push((table.clone(), inserted));
62        rows_skipped.push((table.clone(), skipped));
63    }
64
65    info!(
66        org = %meta.org_id,
67        tables = tables_imported.len(),
68        "org data imported"
69    );
70
71    Ok(ImportResult {
72        org_id: meta.org_id,
73        tables_imported,
74        rows_inserted,
75        rows_skipped,
76    })
77}
78
79/// Insert rows into a table, skipping conflicts (duplicate PKs).
80fn import_table_rows(
81    conn: &rusqlite::Connection,
82    table: &str,
83    rows: &[Value],
84) -> BackupResult<(i64, i64)> {
85    let mut inserted = 0i64;
86    let mut skipped = 0i64;
87
88    for row in rows {
89        let obj = match row.as_object() {
90            Some(o) => o,
91            None => continue,
92        };
93
94        let columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
95        let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("?{i}")).collect();
96
97        let sql = format!(
98            "INSERT OR IGNORE INTO {} ({}) VALUES ({})",
99            table,
100            columns.join(", "),
101            placeholders.join(", "),
102        );
103
104        let values: Vec<Box<dyn rusqlite::ToSql>> = obj.values().map(json_to_tosql).collect();
105
106        let refs: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
107
108        let changed = conn.execute(&sql, refs.as_slice())?;
109        if changed > 0 {
110            inserted += 1;
111        } else {
112            skipped += 1;
113        }
114    }
115
116    Ok((inserted, skipped))
117}
118
119/// Convert a serde_json Value to a boxed ToSql for rusqlite.
120fn json_to_tosql(val: &Value) -> Box<dyn rusqlite::ToSql> {
121    match val {
122        Value::String(s) => Box::new(s.clone()),
123        Value::Number(n) => {
124            if let Some(i) = n.as_i64() {
125                Box::new(i)
126            } else if let Some(f) = n.as_f64() {
127                Box::new(f)
128            } else {
129                Box::new(n.to_string())
130            }
131        }
132        Value::Bool(b) => Box::new(*b as i32),
133        Value::Null => Box::new(rusqlite::types::Null),
134        other => Box::new(other.to_string()),
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141
142    fn setup_pool() -> ConnPool {
143        let pool = convergio_db::pool::create_memory_pool().unwrap();
144        let conn = pool.get().unwrap();
145        for m in crate::schema::migrations() {
146            conn.execute_batch(m.up).unwrap();
147        }
148        conn.execute_batch(
149            "CREATE TABLE IF NOT EXISTS ipc_agents (
150                id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
151                created_at TEXT DEFAULT (datetime('now'))
152            )",
153        )
154        .unwrap();
155        drop(conn);
156        pool
157    }
158
159    fn write_bundle(dir: &Path, org_id: &str, data: Value) -> std::path::PathBuf {
160        let bundle = serde_json::json!({
161            "meta": {
162                "org_id": org_id,
163                "org_name": "Test Corp",
164                "exported_at": "2026-04-03T00:00:00Z",
165                "node": "test-node",
166                "tables": ["ipc_agents"],
167                "row_counts": [["ipc_agents", 1]],
168                "version": "0.1.0"
169            },
170            "data": data,
171        });
172        let path = dir.join("import-test.json");
173        std::fs::write(&path, serde_json::to_string_pretty(&bundle).unwrap()).unwrap();
174        path
175    }
176
177    #[test]
178    fn import_inserts_rows() {
179        let pool = setup_pool();
180        let tmp = tempfile::tempdir().unwrap();
181        let data = serde_json::json!({
182            "ipc_agents": [
183                {"id": "a1", "name": "Elena", "org_id": "org-legal"}
184            ]
185        });
186        let path = write_bundle(tmp.path(), "org-legal", data);
187
188        let result = import_org_data(&pool, &path).unwrap();
189        assert_eq!(result.rows_inserted[0].1, 1);
190
191        let conn = pool.get().unwrap();
192        let count: i64 = conn
193            .query_row("SELECT COUNT(*) FROM ipc_agents", [], |r| r.get(0))
194            .unwrap();
195        assert_eq!(count, 1);
196    }
197
198    #[test]
199    fn import_skips_duplicates() {
200        let pool = setup_pool();
201        let conn = pool.get().unwrap();
202        conn.execute(
203            "INSERT INTO ipc_agents VALUES ('a1', 'Existing', 'org-legal', \
204             datetime('now'))",
205            [],
206        )
207        .unwrap();
208        drop(conn);
209
210        let tmp = tempfile::tempdir().unwrap();
211        let data = serde_json::json!({
212            "ipc_agents": [
213                {"id": "a1", "name": "Elena", "org_id": "org-legal"}
214            ]
215        });
216        let path = write_bundle(tmp.path(), "org-legal", data);
217
218        let result = import_org_data(&pool, &path).unwrap();
219        assert_eq!(result.rows_skipped[0].1, 1);
220        assert_eq!(result.rows_inserted[0].1, 0);
221    }
222
223    #[test]
224    fn import_ignores_missing_tables() {
225        let pool = setup_pool();
226        let tmp = tempfile::tempdir().unwrap();
227        let data = serde_json::json!({
228            "nonexistent_table": [{"id": "x"}]
229        });
230        let path = write_bundle(tmp.path(), "org-test", data);
231
232        let result = import_org_data(&pool, &path).unwrap();
233        assert!(result.tables_imported.is_empty());
234    }
235}