infigraph_core/structured/
ingest.rs1use std::path::Path;
2
3use anyhow::{bail, Context, Result};
4
5use super::schema::{escape, format_value, interpolate_template, IngestResult, SchemaMeta};
6
7pub fn ingest_data(
8 conn: &kuzu::Connection<'_>,
9 schema: &SchemaMeta,
10 data: &[serde_json::Value],
11) -> Result<IngestResult> {
12 for ddl in schema.generate_ddl() {
13 match conn.query(&ddl) {
14 Ok(_) => {}
15 Err(e) => {
16 let msg = format!("{e}");
17 if !msg.contains("already exists") {
18 bail!("DDL failed: {}", e);
19 }
20 }
21 }
22 }
23
24 let mut nodes_created = 0usize;
25 let mut edges_created = 0usize;
26
27 for (idx, record) in data.iter().enumerate() {
28 let obj = record
29 .as_object()
30 .with_context(|| format!("record {} is not an object", idx))?;
31
32 let id = if let Some(tmpl) = &schema.id_template {
33 interpolate_template(tmpl, obj)
34 } else if let Some(v) = obj.get("id") {
35 v.as_str()
36 .unwrap_or(&format!("{}_{}", schema.schema_id, idx))
37 .to_string()
38 } else {
39 format!("{}_{}", schema.schema_id, idx)
40 };
41
42 let mut props = vec![format!("id: '{}'", escape(&id))];
43 for col in &schema.columns {
44 let val = obj.get(&col.name);
45 if col.required && val.is_none() {
46 bail!("Record {}: missing required field '{}'", idx, col.name);
47 }
48 let formatted = format_value(&col.col_type, val);
49 props.push(format!("{}: {}", col.name, formatted));
50 }
51
52 let cypher = format!("CREATE (:{} {{{}}})", schema.node_table, props.join(", "));
53 conn.query(&cypher)
54 .map_err(|e| anyhow::anyhow!("failed to create node {}: {}", id, e))?;
55 nodes_created += 1;
56
57 for edge in &schema.edges {
58 let targets = match obj.get(&edge.source_field) {
59 Some(serde_json::Value::Array(arr)) => arr
60 .iter()
61 .filter_map(|v| v.as_str().map(String::from))
62 .collect::<Vec<_>>(),
63 Some(serde_json::Value::String(s)) => vec![s.clone()],
64 _ => continue,
65 };
66
67 for target in &targets {
68 let target_id = if edge.to_table == "Symbol" {
69 resolve_symbol(conn, target).unwrap_or_else(|| {
70 eprintln!("[warn] unresolved symbol reference: '{}'", target);
71 target.clone()
72 })
73 } else if let Some(lookup) = &edge.target_lookup {
74 format!("{}_{}", lookup, target)
75 } else {
76 target.clone()
77 };
78
79 let mut edge_props = String::new();
80 if !edge.properties.is_empty() {
81 let p: Vec<String> = edge
82 .properties
83 .iter()
84 .map(|c| {
85 let val = obj.get(&c.name);
86 format!("{}: {}", c.name, format_value(&c.col_type, val))
87 })
88 .collect();
89 edge_props = format!(", {}", p.join(", "));
90 }
91
92 let edge_prop_str = if edge_props.is_empty() {
93 String::new()
94 } else {
95 format!("{{{}}}", edge_props.trim_start_matches(", "))
96 };
97 let cypher = format!(
98 "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) CREATE (a)-[:{}{}]->(b)",
99 schema.node_table,
100 escape(&id),
101 edge.to_table,
102 escape(&target_id),
103 edge.name,
104 edge_prop_str,
105 );
106 let check_query = format!(
107 "MATCH (a:{} {{id: '{}'}}), (b:{} {{id: '{}'}}) RETURN count(*)",
108 schema.node_table,
109 escape(&id),
110 edge.to_table,
111 escape(&target_id),
112 );
113 let target_exists = conn
114 .query(&check_query)
115 .ok()
116 .and_then(|mut qr| {
117 qr.next()
118 .map(|row| row[0].to_string().parse::<u64>().unwrap_or(0) > 0)
119 })
120 .unwrap_or(false);
121
122 if target_exists {
123 match conn.query(&cypher) {
124 Ok(_) => edges_created += 1,
125 Err(_) => {}
126 }
127 }
128 }
129 }
130 }
131
132 Ok(IngestResult {
133 nodes_created,
134 edges_created,
135 })
136}
137
138fn resolve_symbol(conn: &kuzu::Connection<'_>, reference: &str) -> Option<String> {
139 let esc = reference.replace('\'', "\\'");
140 let query = format!(
141 "MATCH (s:Symbol) WHERE s.id = '{}' OR s.name = '{}' RETURN s.id LIMIT 1",
142 esc, esc
143 );
144 conn.query(&query)
145 .ok()
146 .and_then(|mut result| result.next().map(|row| row[0].to_string()))
147}
148
149pub fn ingest_directory(
150 conn: &kuzu::Connection<'_>,
151 schema: &SchemaMeta,
152 dir_path: &Path,
153) -> Result<IngestResult> {
154 if !dir_path.is_dir() {
155 bail!("'{}' is not a directory", dir_path.display());
156 }
157
158 let mut total = IngestResult {
159 nodes_created: 0,
160 edges_created: 0,
161 };
162
163 for entry in std::fs::read_dir(dir_path)
164 .with_context(|| format!("failed to read directory: {}", dir_path.display()))?
165 {
166 let entry = entry?;
167 let path = entry.path();
168 let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
169 if !matches!(ext, "json" | "yaml" | "yml") {
170 continue;
171 }
172 let result = ingest_file(conn, schema, &path)?;
173 total.nodes_created += result.nodes_created;
174 total.edges_created += result.edges_created;
175 }
176
177 Ok(total)
178}
179
180pub fn ingest_file(
181 conn: &kuzu::Connection<'_>,
182 schema: &SchemaMeta,
183 data_path: &Path,
184) -> Result<IngestResult> {
185 let content = std::fs::read_to_string(data_path)
186 .with_context(|| format!("failed to read data file: {}", data_path.display()))?;
187
188 let ext = data_path.extension().and_then(|e| e.to_str()).unwrap_or("");
189 let data: Vec<serde_json::Value> = match ext {
190 "json" => {
191 let parsed: serde_json::Value = serde_json::from_str(&content)
192 .with_context(|| format!("invalid JSON: {}", data_path.display()))?;
193 match parsed {
194 serde_json::Value::Array(arr) => arr,
195 obj @ serde_json::Value::Object(_) => vec![obj],
196 _ => bail!("JSON must be an array or object"),
197 }
198 }
199 "yaml" | "yml" => {
200 let parsed: serde_json::Value = serde_yaml::from_str(&content)
201 .with_context(|| format!("invalid YAML: {}", data_path.display()))?;
202 match parsed {
203 serde_json::Value::Array(arr) => arr,
204 obj @ serde_json::Value::Object(_) => vec![obj],
205 _ => bail!("YAML must be a sequence or mapping"),
206 }
207 }
208 _ => bail!(
209 "Unsupported data file format '{}' — use .json or .yaml/.yml",
210 ext
211 ),
212 };
213
214 ingest_data(conn, schema, &data)
215}