Skip to main content

convergio_backup/
import.rs

1//! Org data import — load a previously exported JSON bundle.
2//!
3//! Reads the export bundle, validates metadata, and inserts rows
4//! into the appropriate tables. Skips rows that already exist (by PK).
5
6use crate::types::{BackupResult, OrgExportMeta};
7use convergio_db::pool::ConnPool;
8use serde_json::Value;
9use std::path::Path;
10use tracing::{info, warn};
11
12/// Import result summary.
13#[derive(Debug, Clone, serde::Serialize)]
14pub struct ImportResult {
15    pub org_id: String,
16    pub tables_imported: Vec<String>,
17    pub rows_inserted: Vec<(String, i64)>,
18    pub rows_skipped: Vec<(String, i64)>,
19}
20
21/// Import org data from an export bundle file.
22pub fn import_org_data(pool: &ConnPool, bundle_path: &Path) -> BackupResult<ImportResult> {
23    let content = std::fs::read_to_string(bundle_path)?;
24    let bundle: Value = serde_json::from_str(&content)?;
25
26    let meta: OrgExportMeta = serde_json::from_value(bundle["meta"].clone())?;
27
28    let data = bundle["data"]
29        .as_object()
30        .unwrap_or(&serde_json::Map::new())
31        .clone();
32
33    let conn = pool.get()?;
34    let mut tables_imported = Vec::new();
35    let mut rows_inserted = Vec::new();
36    let mut rows_skipped = Vec::new();
37
38    for (table, rows_val) in &data {
39        let rows = match rows_val.as_array() {
40            Some(arr) => arr,
41            None => continue,
42        };
43
44        // Check table exists
45        let exists: bool = conn
46            .query_row(
47                "SELECT COUNT(*) FROM sqlite_master \
48                 WHERE type='table' AND name=?1",
49                rusqlite::params![table],
50                |r| r.get::<_, i64>(0),
51            )
52            .map(|c| c > 0)?;
53
54        if !exists {
55            warn!(table = %table, "table does not exist, skipping import");
56            continue;
57        }
58
59        let (inserted, skipped) = import_table_rows(&conn, table, rows)?;
60        tables_imported.push(table.clone());
61        rows_inserted.push((table.clone(), inserted));
62        rows_skipped.push((table.clone(), skipped));
63    }
64
65    info!(
66        org = %meta.org_id,
67        tables = tables_imported.len(),
68        "org data imported"
69    );
70
71    Ok(ImportResult {
72        org_id: meta.org_id,
73        tables_imported,
74        rows_inserted,
75        rows_skipped,
76    })
77}
78
79/// Insert rows into a table, skipping conflicts (duplicate PKs).
80fn import_table_rows(
81    conn: &rusqlite::Connection,
82    table: &str,
83    rows: &[Value],
84) -> BackupResult<(i64, i64)> {
85    // Validate table name to prevent SQL injection from malicious bundles
86    crate::types::validate_sql_identifier(table)?;
87
88    let mut inserted = 0i64;
89    let mut skipped = 0i64;
90
91    for row in rows {
92        let obj = match row.as_object() {
93            Some(o) => o,
94            None => continue,
95        };
96
97        let columns: Vec<&str> = obj.keys().map(|k| k.as_str()).collect();
98        // Validate all column names from the bundle
99        for col in &columns {
100            crate::types::validate_sql_identifier(col)?;
101        }
102        let placeholders: Vec<String> = (1..=columns.len()).map(|i| format!("?{i}")).collect();
103
104        let sql = format!(
105            "INSERT OR IGNORE INTO {} ({}) VALUES ({})",
106            table,
107            columns.join(", "),
108            placeholders.join(", "),
109        );
110
111        let values: Vec<Box<dyn rusqlite::ToSql>> = obj.values().map(json_to_tosql).collect();
112
113        let refs: Vec<&dyn rusqlite::ToSql> = values.iter().map(|b| b.as_ref()).collect();
114
115        let changed = conn.execute(&sql, refs.as_slice())?;
116        if changed > 0 {
117            inserted += 1;
118        } else {
119            skipped += 1;
120        }
121    }
122
123    Ok((inserted, skipped))
124}
125
126/// Convert a serde_json Value to a boxed ToSql for rusqlite.
127fn json_to_tosql(val: &Value) -> Box<dyn rusqlite::ToSql> {
128    match val {
129        Value::String(s) => Box::new(s.clone()),
130        Value::Number(n) => {
131            if let Some(i) = n.as_i64() {
132                Box::new(i)
133            } else if let Some(f) = n.as_f64() {
134                Box::new(f)
135            } else {
136                Box::new(n.to_string())
137            }
138        }
139        Value::Bool(b) => Box::new(*b as i32),
140        Value::Null => Box::new(rusqlite::types::Null),
141        other => Box::new(other.to_string()),
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    fn setup_pool() -> ConnPool {
150        let pool = convergio_db::pool::create_memory_pool().unwrap();
151        let conn = pool.get().unwrap();
152        for m in crate::schema::migrations() {
153            conn.execute_batch(m.up).unwrap();
154        }
155        conn.execute_batch(
156            "CREATE TABLE IF NOT EXISTS ipc_agents (
157                id TEXT PRIMARY KEY, name TEXT, org_id TEXT,
158                created_at TEXT DEFAULT (datetime('now'))
159            )",
160        )
161        .unwrap();
162        drop(conn);
163        pool
164    }
165
166    fn write_bundle(dir: &Path, org_id: &str, data: Value) -> std::path::PathBuf {
167        let bundle = serde_json::json!({
168            "meta": {
169                "org_id": org_id,
170                "org_name": "Test Corp",
171                "exported_at": "2026-04-03T00:00:00Z",
172                "node": "test-node",
173                "tables": ["ipc_agents"],
174                "row_counts": [["ipc_agents", 1]],
175                "version": "0.1.0"
176            },
177            "data": data,
178        });
179        let path = dir.join("import-test.json");
180        std::fs::write(&path, serde_json::to_string_pretty(&bundle).unwrap()).unwrap();
181        path
182    }
183
184    #[test]
185    fn import_inserts_rows() {
186        let pool = setup_pool();
187        let tmp = tempfile::tempdir().unwrap();
188        let data = serde_json::json!({
189            "ipc_agents": [
190                {"id": "a1", "name": "Elena", "org_id": "org-legal"}
191            ]
192        });
193        let path = write_bundle(tmp.path(), "org-legal", data);
194
195        let result = import_org_data(&pool, &path).unwrap();
196        assert_eq!(result.rows_inserted[0].1, 1);
197
198        let conn = pool.get().unwrap();
199        let count: i64 = conn
200            .query_row("SELECT COUNT(*) FROM ipc_agents", [], |r| r.get(0))
201            .unwrap();
202        assert_eq!(count, 1);
203    }
204
205    #[test]
206    fn import_skips_duplicates() {
207        let pool = setup_pool();
208        let conn = pool.get().unwrap();
209        conn.execute(
210            "INSERT INTO ipc_agents VALUES ('a1', 'Existing', 'org-legal', \
211             datetime('now'))",
212            [],
213        )
214        .unwrap();
215        drop(conn);
216
217        let tmp = tempfile::tempdir().unwrap();
218        let data = serde_json::json!({
219            "ipc_agents": [
220                {"id": "a1", "name": "Elena", "org_id": "org-legal"}
221            ]
222        });
223        let path = write_bundle(tmp.path(), "org-legal", data);
224
225        let result = import_org_data(&pool, &path).unwrap();
226        assert_eq!(result.rows_skipped[0].1, 1);
227        assert_eq!(result.rows_inserted[0].1, 0);
228    }
229
230    #[test]
231    fn import_ignores_missing_tables() {
232        let pool = setup_pool();
233        let tmp = tempfile::tempdir().unwrap();
234        let data = serde_json::json!({
235            "nonexistent_table": [{"id": "x"}]
236        });
237        let path = write_bundle(tmp.path(), "org-test", data);
238
239        let result = import_org_data(&pool, &path).unwrap();
240        assert!(result.tables_imported.is_empty());
241    }
242}