Skip to main content

infigraph_core/structured/
ingest.rs

1use std::path::Path;
2
3use anyhow::{bail, Context, Result};
4
5use super::schema::{escape, format_value, interpolate_template, IngestResult, SchemaMeta};
6
7pub fn ingest_data(
8    conn: &kuzu::Connection<'_>,
9    schema: &SchemaMeta,
10    data: &[serde_json::Value],
11) -> Result<IngestResult> {
12    for ddl in schema.generate_ddl() {
13        match conn.query(&ddl) {
14            Ok(_) => {}
15            Err(e) => {
16                let msg = format!("{e}");
17                if !msg.contains("already exists") {
18                    bail!("DDL failed: {}", e);
19                }
20            }
21        }
22    }
23
24    let mut nodes_created = 0usize;
25    let mut edges_created = 0usize;
26
27    for (idx, record) in data.iter().enumerate() {
28        let obj = record
29            .as_object()
30            .with_context(|| format!("record {} is not an object", idx))?;
31
32        let id = if let Some(tmpl) = &schema.id_template {
33            interpolate_template(tmpl, obj)
34        } else if let Some(v) = obj.get("id") {
35            v.as_str()
36                .unwrap_or(&format!("{}_{}", schema.schema_id, idx))
37                .to_string()
38        } else {
39            format!("{}_{}", schema.schema_id, idx)
40        };
41
42        let mut props = vec![format!("id: '{}'", escape(&id))];
43        for col in &schema.columns {
44            let val = obj.get(&col.name);
45            if col.required && val.is_none() {
46                bail!("Record {}: missing required field '{}'", idx, col.name);
47            }
48            let formatted = format_value(&col.col_type, val);
49            props.push(format!("{}: {}", col.name, formatted));
50        }
51
52        let cypher = format!("CREATE (:{} {{{}}})", schema.node_table, props.join(", "));
53        conn.query(&cypher)
54            .map_err(|e| anyhow::anyhow!("failed to create node {}: {}", id, e))?;
55        nodes_created += 1;
56
57        for edge in &schema.edges {
58            let targets = match obj.get(&edge.source_field) {
59                Some(serde_json::Value::Array(arr)) => arr
60                    .iter()
61                    .filter_map(|v| v.as_str().map(String::from))
62                    .collect::<Vec<_>>(),
63                Some(serde_json::Value::String(s)) => vec![s.clone()],
64                _ => continue,
65            };
66
67            for target in &targets {
68                let target_id = if edge.to_table == "Symbol" {
69                    resolve_symbol(conn, target).unwrap_or_else(|| {
70                        eprintln!("[warn] unresolved symbol reference: '{}'", target);
71                        target.clone()
72                    })
73                } else if let Some(lookup) = &edge.target_lookup {
74                    format!("{}_{}", lookup, target)
75                } else {
76                    target.clone()
77                };
78
79                let mut edge_props = String::new();
80                if !edge.properties.is_empty() {
81                    let p: Vec<String> = edge
82                        .properties
83                        .iter()
84                        .map(|c| {
85                            let val = obj.get(&c.name);
86                            format!("{}: {}", c.name, format_value(&c.col_type, val))
87                        })
88                        .collect();
89                    edge_props = format!(", {}", p.join(", "));
90                }
91
92                let edge_prop_str = if edge_props.is_empty() {
93                    String::new()
94                } else {
95                    format!("{{{}}}", edge_props.trim_start_matches(", "))
96                };
97                let cypher = format!(
98                    "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) CREATE (a)-[:{}{}]->(b)",
99                    schema.node_table,
100                    escape(&id),
101                    edge.to_table,
102                    escape(&target_id),
103                    edge.name,
104                    edge_prop_str,
105                );
106                let check_query = format!(
107                    "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) RETURN count(*)",
108                    schema.node_table,
109                    escape(&id),
110                    edge.to_table,
111                    escape(&target_id),
112                );
113                let target_exists = conn
114                    .query(&check_query)
115                    .ok()
116                    .and_then(|mut qr| {
117                        qr.next()
118                            .map(|row| row[0].to_string().parse::<u64>().unwrap_or(0) > 0)
119                    })
120                    .unwrap_or(false);
121
122                if target_exists && conn.query(&cypher).is_ok() {
123                    edges_created += 1;
124                }
125            }
126        }
127    }
128
129    Ok(IngestResult {
130        nodes_created,
131        edges_created,
132    })
133}
134
135fn resolve_symbol(conn: &kuzu::Connection<'_>, reference: &str) -> Option<String> {
136    let esc = reference.replace('\'', "\\'");
137    let query = format!(
138        "MATCH (s:Symbol) WHERE s.id = '{}' OR s.name = '{}' RETURN s.id LIMIT 1",
139        esc, esc
140    );
141    conn.query(&query)
142        .ok()
143        .and_then(|mut result| result.next().map(|row| row[0].to_string()))
144}
145
146pub fn ingest_directory(
147    conn: &kuzu::Connection<'_>,
148    schema: &SchemaMeta,
149    dir_path: &Path,
150) -> Result<IngestResult> {
151    if !dir_path.is_dir() {
152        bail!("'{}' is not a directory", dir_path.display());
153    }
154
155    let mut total = IngestResult {
156        nodes_created: 0,
157        edges_created: 0,
158    };
159
160    for entry in std::fs::read_dir(dir_path)
161        .with_context(|| format!("failed to read directory: {}", dir_path.display()))?
162    {
163        let entry = entry?;
164        let path = entry.path();
165        let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
166        if !matches!(ext, "json" | "yaml" | "yml") {
167            continue;
168        }
169        let result = ingest_file(conn, schema, &path)?;
170        total.nodes_created += result.nodes_created;
171        total.edges_created += result.edges_created;
172    }
173
174    Ok(total)
175}
176
177pub fn ingest_file(
178    conn: &kuzu::Connection<'_>,
179    schema: &SchemaMeta,
180    data_path: &Path,
181) -> Result<IngestResult> {
182    let content = std::fs::read_to_string(data_path)
183        .with_context(|| format!("failed to read data file: {}", data_path.display()))?;
184
185    let ext = data_path.extension().and_then(|e| e.to_str()).unwrap_or("");
186    let data: Vec<serde_json::Value> = match ext {
187        "json" => {
188            let parsed: serde_json::Value = serde_json::from_str(&content)
189                .with_context(|| format!("invalid JSON: {}", data_path.display()))?;
190            match parsed {
191                serde_json::Value::Array(arr) => arr,
192                obj @ serde_json::Value::Object(_) => vec![obj],
193                _ => bail!("JSON must be an array or object"),
194            }
195        }
196        "yaml" | "yml" => {
197            let parsed: serde_json::Value = serde_yaml::from_str(&content)
198                .with_context(|| format!("invalid YAML: {}", data_path.display()))?;
199            match parsed {
200                serde_json::Value::Array(arr) => arr,
201                obj @ serde_json::Value::Object(_) => vec![obj],
202                _ => bail!("YAML must be a sequence or mapping"),
203            }
204        }
205        _ => bail!(
206            "Unsupported data file format '{}' — use .json or .yaml/.yml",
207            ext
208        ),
209    };
210
211    ingest_data(conn, schema, &data)
212}