infigraph_core/structured/
ingest.rs1use std::path::Path;
2
3use anyhow::{bail, Context, Result};
4
5use super::schema::{escape, format_value, interpolate_template, IngestResult, SchemaMeta};
6
7pub fn ingest_data(
8 conn: &kuzu::Connection<'_>,
9 schema: &SchemaMeta,
10 data: &[serde_json::Value],
11) -> Result<IngestResult> {
12 for ddl in schema.generate_ddl() {
13 match conn.query(&ddl) {
14 Ok(_) => {}
15 Err(e) => {
16 let msg = format!("{e}");
17 if !msg.contains("already exists") {
18 bail!("DDL failed: {}", e);
19 }
20 }
21 }
22 }
23
24 let mut nodes_created = 0usize;
25 let mut edges_created = 0usize;
26
27 for (idx, record) in data.iter().enumerate() {
28 let obj = record
29 .as_object()
30 .with_context(|| format!("record {} is not an object", idx))?;
31
32 let id = if let Some(tmpl) = &schema.id_template {
33 interpolate_template(tmpl, obj)
34 } else if let Some(v) = obj.get("id") {
35 v.as_str()
36 .unwrap_or(&format!("{}_{}", schema.schema_id, idx))
37 .to_string()
38 } else {
39 format!("{}_{}", schema.schema_id, idx)
40 };
41
42 let mut props = vec![format!("id: '{}'", escape(&id))];
43 for col in &schema.columns {
44 let val = obj.get(&col.name);
45 if col.required && val.is_none() {
46 bail!("Record {}: missing required field '{}'", idx, col.name);
47 }
48 let formatted = format_value(&col.col_type, val);
49 props.push(format!("{}: {}", col.name, formatted));
50 }
51
52 let cypher = format!("CREATE (:{} {{{}}})", schema.node_table, props.join(", "));
53 conn.query(&cypher)
54 .map_err(|e| anyhow::anyhow!("failed to create node {}: {}", id, e))?;
55 nodes_created += 1;
56
57 for edge in &schema.edges {
58 let targets = match obj.get(&edge.source_field) {
59 Some(serde_json::Value::Array(arr)) => arr
60 .iter()
61 .filter_map(|v| v.as_str().map(String::from))
62 .collect::<Vec<_>>(),
63 Some(serde_json::Value::String(s)) => vec![s.clone()],
64 _ => continue,
65 };
66
67 for target in &targets {
68 let target_id = if edge.to_table == "Symbol" {
69 resolve_symbol(conn, target).unwrap_or_else(|| {
70 eprintln!("[warn] unresolved symbol reference: '{}'", target);
71 target.clone()
72 })
73 } else if let Some(lookup) = &edge.target_lookup {
74 format!("{}_{}", lookup, target)
75 } else {
76 target.clone()
77 };
78
79 let mut edge_props = String::new();
80 if !edge.properties.is_empty() {
81 let p: Vec<String> = edge
82 .properties
83 .iter()
84 .map(|c| {
85 let val = obj.get(&c.name);
86 format!("{}: {}", c.name, format_value(&c.col_type, val))
87 })
88 .collect();
89 edge_props = format!(", {}", p.join(", "));
90 }
91
92 let edge_prop_str = if edge_props.is_empty() {
93 String::new()
94 } else {
95 format!("{{{}}}", edge_props.trim_start_matches(", "))
96 };
97 let cypher = format!(
98 "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) CREATE (a)-[:{}{}]->(b)",
99 schema.node_table,
100 escape(&id),
101 edge.to_table,
102 escape(&target_id),
103 edge.name,
104 edge_prop_str,
105 );
106 let check_query = format!(
107 "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) RETURN count(*)",
108 schema.node_table,
109 escape(&id),
110 edge.to_table,
111 escape(&target_id),
112 );
113 let target_exists = conn
114 .query(&check_query)
115 .ok()
116 .and_then(|mut qr| {
117 qr.next()
118 .map(|row| row[0].to_string().parse::<u64>().unwrap_or(0) > 0)
119 })
120 .unwrap_or(false);
121
122 if target_exists && conn.query(&cypher).is_ok() {
123 edges_created += 1;
124 }
125 }
126 }
127 }
128
129 Ok(IngestResult {
130 nodes_created,
131 edges_created,
132 })
133}
134
135fn resolve_symbol(conn: &kuzu::Connection<'_>, reference: &str) -> Option<String> {
136 let esc = reference.replace('\'', "\\'");
137 let query = format!(
138 "MATCH (s:Symbol) WHERE s.id = '{}' OR s.name = '{}' RETURN s.id LIMIT 1",
139 esc, esc
140 );
141 conn.query(&query)
142 .ok()
143 .and_then(|mut result| result.next().map(|row| row[0].to_string()))
144}
145
146pub fn ingest_directory(
147 conn: &kuzu::Connection<'_>,
148 schema: &SchemaMeta,
149 dir_path: &Path,
150) -> Result<IngestResult> {
151 if !dir_path.is_dir() {
152 bail!("'{}' is not a directory", dir_path.display());
153 }
154
155 let mut total = IngestResult {
156 nodes_created: 0,
157 edges_created: 0,
158 };
159
160 for entry in std::fs::read_dir(dir_path)
161 .with_context(|| format!("failed to read directory: {}", dir_path.display()))?
162 {
163 let entry = entry?;
164 let path = entry.path();
165 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
166 if !matches!(ext, "json" | "yaml" | "yml") {
167 continue;
168 }
169 let result = ingest_file(conn, schema, &path)?;
170 total.nodes_created += result.nodes_created;
171 total.edges_created += result.edges_created;
172 }
173
174 Ok(total)
175}
176
177pub fn ingest_file(
178 conn: &kuzu::Connection<'_>,
179 schema: &SchemaMeta,
180 data_path: &Path,
181) -> Result<IngestResult> {
182 let content = std::fs::read_to_string(data_path)
183 .with_context(|| format!("failed to read data file: {}", data_path.display()))?;
184
185 let ext = data_path.extension().and_then(|e| e.to_str()).unwrap_or("");
186 let data: Vec<serde_json::Value> = match ext {
187 "json" => {
188 let parsed: serde_json::Value = serde_json::from_str(&content)
189 .with_context(|| format!("invalid JSON: {}", data_path.display()))?;
190 match parsed {
191 serde_json::Value::Array(arr) => arr,
192 obj @ serde_json::Value::Object(_) => vec![obj],
193 _ => bail!("JSON must be an array or object"),
194 }
195 }
196 "yaml" | "yml" => {
197 let parsed: serde_json::Value = serde_yaml::from_str(&content)
198 .with_context(|| format!("invalid YAML: {}", data_path.display()))?;
199 match parsed {
200 serde_json::Value::Array(arr) => arr,
201 obj @ serde_json::Value::Object(_) => vec![obj],
202 _ => bail!("YAML must be a sequence or mapping"),
203 }
204 }
205 _ => bail!(
206 "Unsupported data file format '{}' — use .json or .yaml/.yml",
207 ext
208 ),
209 };
210
211 ingest_data(conn, schema, &data)
212}