1use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use serde::{Deserialize, Serialize};
13
14use crate::models::Graph;
15
16#[derive(Debug, Clone)]
18pub struct Neo4jExportConfig {
19 pub export_node_properties: bool,
21 pub export_edge_properties: bool,
23 pub export_features: bool,
25 pub generate_cypher: bool,
27 pub generate_admin_import: bool,
29 pub database_name: String,
31 pub cypher_batch_size: usize,
33}
34
35impl Default for Neo4jExportConfig {
36 fn default() -> Self {
37 Self {
38 export_node_properties: true,
39 export_edge_properties: true,
40 export_features: true,
41 generate_cypher: true,
42 generate_admin_import: true,
43 database_name: "synth".to_string(),
44 cypher_batch_size: 1000,
45 }
46 }
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct Neo4jMetadata {
52 pub name: String,
54 pub num_nodes: usize,
56 pub num_edges: usize,
58 pub node_labels: Vec<String>,
60 pub relationship_types: Vec<String>,
62 pub files: Vec<String>,
64}
65
66pub struct Neo4jExporter {
68 config: Neo4jExportConfig,
69}
70
71impl Neo4jExporter {
72 pub fn new(config: Neo4jExportConfig) -> Self {
74 Self { config }
75 }
76
77 pub fn export(&self, graph: &Graph, output_dir: &Path) -> std::io::Result<Neo4jMetadata> {
79 fs::create_dir_all(output_dir)?;
80
81 let mut files = Vec::new();
82
83 let node_labels = self.export_nodes(graph, output_dir, &mut files)?;
85
86 let relationship_types = self.export_edges(graph, output_dir, &mut files)?;
88
89 if self.config.generate_cypher {
91 self.generate_cypher_script(graph, output_dir, &node_labels, &relationship_types)?;
92 files.push("import.cypher".to_string());
93 }
94
95 if self.config.generate_admin_import {
97 self.generate_admin_import_script(output_dir, &node_labels, &relationship_types)?;
98 files.push("admin_import.sh".to_string());
99 }
100
101 let metadata = Neo4jMetadata {
103 name: graph.name.clone(),
104 num_nodes: graph.node_count(),
105 num_edges: graph.edge_count(),
106 node_labels,
107 relationship_types,
108 files,
109 };
110
111 let metadata_path = output_dir.join("metadata.json");
113 let file = File::create(metadata_path)?;
114 serde_json::to_writer_pretty(file, &metadata)?;
115
116 Ok(metadata)
117 }
118
119 fn export_nodes(
121 &self,
122 graph: &Graph,
123 output_dir: &Path,
124 files: &mut Vec<String>,
125 ) -> std::io::Result<Vec<String>> {
126 let mut labels = Vec::new();
127
128 for (node_type, node_ids) in &graph.nodes_by_type {
129 let label = node_type.as_str();
130 labels.push(label.to_string());
131
132 let filename = format!("nodes_{}.csv", label.to_lowercase());
133 let path = output_dir.join(&filename);
134 files.push(filename);
135
136 let file = File::create(path)?;
137 let mut writer = BufWriter::with_capacity(256 * 1024, file);
138
139 let sample_node = node_ids.first().and_then(|id| graph.nodes.get(id));
141
142 let mut header = vec![
144 "nodeId:ID".to_string(),
145 "code".to_string(),
146 "name".to_string(),
147 ];
148
149 if self.config.export_node_properties {
150 if let Some(node) = sample_node {
151 for key in node.properties.keys() {
152 header.push(key.clone());
153 }
154 }
155 }
156
157 if self.config.export_features {
158 if let Some(node) = sample_node {
159 for i in 0..node.features.len() {
160 header.push(format!("feature_{i}"));
161 }
162 }
163 }
164
165 header.push("isAnomaly:boolean".to_string());
166 header.push(":LABEL".to_string());
167
168 writeln!(writer, "{}", header.join(","))?;
169
170 for &node_id in node_ids {
172 if let Some(node) = graph.nodes.get(&node_id) {
173 let mut row = vec![
174 node_id.to_string(),
175 escape_csv(&node.external_id),
176 escape_csv(&node.label),
177 ];
178
179 if self.config.export_node_properties {
180 for key in &header[3..] {
181 if key.starts_with("feature_")
182 || key == "isAnomaly:boolean"
183 || key == ":LABEL"
184 {
185 break;
186 }
187 let value = node
188 .properties
189 .get(key)
190 .map(|p| p.to_string_value())
191 .unwrap_or_default();
192 row.push(escape_csv(&value));
193 }
194 }
195
196 if self.config.export_features {
197 for &feat in &node.features {
198 row.push(format!("{feat:.6}"));
199 }
200 }
201
202 row.push(node.is_anomaly.to_string());
203 row.push(label.to_string());
204
205 writeln!(writer, "{}", row.join(","))?;
206 }
207 }
208 }
209
210 Ok(labels)
211 }
212
213 fn export_edges(
215 &self,
216 graph: &Graph,
217 output_dir: &Path,
218 files: &mut Vec<String>,
219 ) -> std::io::Result<Vec<String>> {
220 let mut rel_types = Vec::new();
221
222 for (edge_type, edge_ids) in &graph.edges_by_type {
223 let rel_type = edge_type.as_str();
224 rel_types.push(rel_type.to_string());
225
226 let filename = format!("edges_{}.csv", rel_type.to_lowercase());
227 let path = output_dir.join(&filename);
228 files.push(filename);
229
230 let file = File::create(path)?;
231 let mut writer = BufWriter::with_capacity(256 * 1024, file);
232
233 let sample_edge = edge_ids.first().and_then(|id| graph.edges.get(id));
235
236 let mut header = vec![
238 ":START_ID".to_string(),
239 ":END_ID".to_string(),
240 "weight:double".to_string(),
241 ];
242
243 if self.config.export_edge_properties {
244 if let Some(edge) = sample_edge {
245 for key in edge.properties.keys() {
246 header.push(format!("{key}:string"));
247 }
248 }
249 }
250
251 if self.config.export_features {
252 if let Some(edge) = sample_edge {
253 for i in 0..edge.features.len() {
254 header.push(format!("feature_{i}:double"));
255 }
256 }
257 }
258
259 header.push("isAnomaly:boolean".to_string());
260 header.push(":TYPE".to_string());
261
262 writeln!(writer, "{}", header.join(","))?;
263
264 for &edge_id in edge_ids {
266 if let Some(edge) = graph.edges.get(&edge_id) {
267 let mut row = vec![
268 edge.source.to_string(),
269 edge.target.to_string(),
270 format!("{:.6}", edge.weight),
271 ];
272
273 if self.config.export_edge_properties {
274 for (key, value) in &edge.properties {
275 if !header.iter().any(|h| h.starts_with(key)) {
276 continue;
277 }
278 row.push(escape_csv(&value.to_string_value()));
279 }
280 }
281
282 if self.config.export_features {
283 for &feat in &edge.features {
284 row.push(format!("{feat:.6}"));
285 }
286 }
287
288 row.push(edge.is_anomaly.to_string());
289 row.push(rel_type.to_string());
290
291 writeln!(writer, "{}", row.join(","))?;
292 }
293 }
294 }
295
296 Ok(rel_types)
297 }
298
299 fn generate_cypher_script(
301 &self,
302 graph: &Graph,
303 output_dir: &Path,
304 node_labels: &[String],
305 relationship_types: &[String],
306 ) -> std::io::Result<()> {
307 let path = output_dir.join("import.cypher");
308 let file = File::create(path)?;
309 let mut writer = BufWriter::with_capacity(256 * 1024, file);
310
311 writeln!(writer, "// Neo4j Import Script for {}", graph.name)?;
312 writeln!(writer, "// Generated by synth-graph")?;
313 writeln!(writer)?;
314
315 writeln!(writer, "// Create constraints and indexes")?;
317 for label in node_labels {
318 writeln!(
319 writer,
320 "CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.nodeId IS UNIQUE;"
321 )?;
322 }
323 writeln!(writer)?;
324
325 writeln!(writer, "// Import nodes")?;
327 for label in node_labels {
328 let filename = format!("nodes_{}.csv", label.to_lowercase());
329 writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{filename}'")?;
330 writeln!(writer, "AS row")?;
331 writeln!(
332 writer,
333 "CREATE (n:{label} {{nodeId: toInteger(row.`nodeId:ID`), code: row.code, name: row.name, isAnomaly: toBoolean(row.`isAnomaly:boolean`)}});"
334 )?;
335 writeln!(writer)?;
336 }
337
338 writeln!(writer, "// Import relationships")?;
340 for rel_type in relationship_types {
341 let filename = format!("edges_{}.csv", rel_type.to_lowercase());
342 writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{filename}'")?;
343 writeln!(writer, "AS row")?;
344 writeln!(
345 writer,
346 "MATCH (source) WHERE source.nodeId = toInteger(row.`:START_ID`)"
347 )?;
348 writeln!(
349 writer,
350 "MATCH (target) WHERE target.nodeId = toInteger(row.`:END_ID`)"
351 )?;
352 writeln!(
353 writer,
354 "CREATE (source)-[:{}{{weight: toFloat(row.`weight:double`), isAnomaly: toBoolean(row.`isAnomaly:boolean`)}}]->(target);",
355 rel_type.to_uppercase().replace("-", "_")
356 )?;
357 writeln!(writer)?;
358 }
359
360 writeln!(writer, "// Verification query")?;
362 writeln!(writer, "CALL db.labels() YIELD label")?;
363 writeln!(
364 writer,
365 "CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {{}})"
366 )?;
367 writeln!(writer, "YIELD value")?;
368 writeln!(writer, "RETURN label, value.count as nodeCount;")?;
369
370 Ok(())
371 }
372
373 fn generate_admin_import_script(
375 &self,
376 output_dir: &Path,
377 node_labels: &[String],
378 relationship_types: &[String],
379 ) -> std::io::Result<()> {
380 let path = output_dir.join("admin_import.sh");
381 let file = File::create(path)?;
382 let mut writer = BufWriter::with_capacity(256 * 1024, file);
383
384 writeln!(writer, "#!/bin/bash")?;
385 writeln!(writer, "# Neo4j Admin Import Script")?;
386 writeln!(writer, "# Generated by synth-graph")?;
387 writeln!(writer)?;
388 writeln!(writer, "# Set Neo4j home directory")?;
389 writeln!(writer, "NEO4J_HOME=${{NEO4J_HOME:-/var/lib/neo4j}}")?;
390 writeln!(writer, "DATA_DIR=${{DATA_DIR:-$(dirname $0)}}")?;
391 writeln!(writer)?;
392 writeln!(writer, "# Stop Neo4j if running")?;
393 writeln!(writer, "# systemctl stop neo4j")?;
394 writeln!(writer)?;
395 writeln!(writer, "# Run import")?;
396 writeln!(writer, "neo4j-admin database import full \\")?;
397 writeln!(writer, " --overwrite-destination=true \\")?;
398 writeln!(writer, " --database={} \\", self.config.database_name)?;
399
400 for label in node_labels {
402 let filename = format!("nodes_{}.csv", label.to_lowercase());
403 writeln!(writer, " --nodes={label}=$DATA_DIR/{filename} \\")?;
404 }
405
406 for rel_type in relationship_types {
408 let filename = format!("edges_{}.csv", rel_type.to_lowercase());
409 writeln!(
410 writer,
411 " --relationships={}=$DATA_DIR/{} \\",
412 rel_type.to_uppercase().replace("-", "_"),
413 filename
414 )?;
415 }
416
417 writeln!(writer, " --skip-bad-relationships=true")?;
418 writeln!(writer)?;
419 writeln!(writer, "echo \"Import complete\"")?;
420 writeln!(writer)?;
421 writeln!(writer, "# Start Neo4j")?;
422 writeln!(writer, "# systemctl start neo4j")?;
423
424 Ok(())
425 }
426}
427
428fn escape_csv(value: &str) -> String {
430 if value.contains(',') || value.contains('"') || value.contains('\n') {
431 format!("\"{}\"", value.replace('"', "\"\""))
432 } else {
433 value.to_string()
434 }
435}
436
437pub struct CypherQueryBuilder {
439 queries: Vec<String>,
440}
441
442impl CypherQueryBuilder {
443 pub fn new() -> Self {
445 Self {
446 queries: Vec::new(),
447 }
448 }
449
450 pub fn create_node(&mut self, label: &str, properties: &HashMap<String, String>) -> &mut Self {
452 let props: Vec<String> = properties
453 .iter()
454 .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
455 .collect();
456
457 self.queries
458 .push(format!("CREATE (:{} {{{}}})", label, props.join(", ")));
459 self
460 }
461
462 pub fn create_relationship(
464 &mut self,
465 from_label: &str,
466 from_id: &str,
467 to_label: &str,
468 to_id: &str,
469 rel_type: &str,
470 properties: &HashMap<String, String>,
471 ) -> &mut Self {
472 let props: Vec<String> = properties
473 .iter()
474 .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
475 .collect();
476
477 let props_str = if props.is_empty() {
478 String::new()
479 } else {
480 format!(" {{{}}}", props.join(", "))
481 };
482
483 self.queries.push(format!(
484 "MATCH (a:{from_label} {{nodeId: '{from_id}'}}), (b:{to_label} {{nodeId: '{to_id}'}}) CREATE (a)-[:{rel_type}{props_str}]->(b)"
485 ));
486 self
487 }
488
489 pub fn build(&self) -> String {
491 self.queries.join(";\n") + ";"
492 }
493}
494
495impl Default for CypherQueryBuilder {
496 fn default() -> Self {
497 Self::new()
498 }
499}
500
501#[cfg(test)]
502mod tests {
503 use super::*;
504 use crate::test_helpers::create_test_graph;
505 use tempfile::tempdir;
506
507 #[test]
508 fn test_neo4j_export() {
509 let graph = create_test_graph();
510 let dir = tempdir().unwrap();
511
512 let exporter = Neo4jExporter::new(Neo4jExportConfig::default());
513 let metadata = exporter.export(&graph, dir.path()).unwrap();
514
515 assert_eq!(metadata.num_nodes, 2);
516 assert_eq!(metadata.num_edges, 1);
517 assert!(dir.path().join("nodes_account.csv").exists());
518 assert!(dir.path().join("edges_transaction.csv").exists());
519 assert!(dir.path().join("import.cypher").exists());
520 assert!(dir.path().join("admin_import.sh").exists());
521 }
522
523 #[test]
524 fn test_csv_escape() {
525 assert_eq!(escape_csv("simple"), "simple");
526 assert_eq!(escape_csv("with,comma"), "\"with,comma\"");
527 assert_eq!(escape_csv("with\"quote"), "\"with\"\"quote\"");
528 }
529
530 #[test]
531 fn test_cypher_builder() {
532 let mut builder = CypherQueryBuilder::new();
533 let mut props = HashMap::new();
534 props.insert("name".to_string(), "Test".to_string());
535
536 builder.create_node("Account", &props);
537 let cypher = builder.build();
538
539 assert!(cypher.contains("CREATE (:Account"));
540 assert!(cypher.contains("name: 'Test'"));
541 }
542}