Skip to main content

datasynth_graph/exporters/
neo4j.rs

1//! Neo4j exporter.
2//!
3//! Exports graph data in formats compatible with Neo4j import:
4//! - CSV files for nodes and edges (neo4j-admin import format)
5//! - Cypher script for direct loading
6
7use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use serde::{Deserialize, Serialize};
13
14use crate::models::Graph;
15
16/// Configuration for Neo4j export.
17#[derive(Debug, Clone)]
18pub struct Neo4jExportConfig {
19    /// Export node properties.
20    pub export_node_properties: bool,
21    /// Export edge properties.
22    pub export_edge_properties: bool,
23    /// Export features as properties.
24    pub export_features: bool,
25    /// Generate Cypher import script.
26    pub generate_cypher: bool,
27    /// Generate neo4j-admin import script.
28    pub generate_admin_import: bool,
29    /// Database name for Cypher.
30    pub database_name: String,
31    /// Batch size for Cypher imports.
32    pub cypher_batch_size: usize,
33}
34
35impl Default for Neo4jExportConfig {
36    fn default() -> Self {
37        Self {
38            export_node_properties: true,
39            export_edge_properties: true,
40            export_features: true,
41            generate_cypher: true,
42            generate_admin_import: true,
43            database_name: "synth".to_string(),
44            cypher_batch_size: 1000,
45        }
46    }
47}
48
49/// Metadata about the exported Neo4j data.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct Neo4jMetadata {
52    /// Graph name.
53    pub name: String,
54    /// Number of nodes.
55    pub num_nodes: usize,
56    /// Number of edges.
57    pub num_edges: usize,
58    /// Node labels (types).
59    pub node_labels: Vec<String>,
60    /// Relationship types.
61    pub relationship_types: Vec<String>,
62    /// Files included in export.
63    pub files: Vec<String>,
64}
65
66/// Neo4j exporter.
67pub struct Neo4jExporter {
68    config: Neo4jExportConfig,
69}
70
71impl Neo4jExporter {
72    /// Creates a new Neo4j exporter.
73    pub fn new(config: Neo4jExportConfig) -> Self {
74        Self { config }
75    }
76
77    /// Exports a graph to Neo4j format.
78    pub fn export(&self, graph: &Graph, output_dir: &Path) -> std::io::Result<Neo4jMetadata> {
79        fs::create_dir_all(output_dir)?;
80
81        let mut files = Vec::new();
82
83        // Export nodes by type
84        let node_labels = self.export_nodes(graph, output_dir, &mut files)?;
85
86        // Export edges by type
87        let relationship_types = self.export_edges(graph, output_dir, &mut files)?;
88
89        // Generate Cypher script
90        if self.config.generate_cypher {
91            self.generate_cypher_script(graph, output_dir, &node_labels, &relationship_types)?;
92            files.push("import.cypher".to_string());
93        }
94
95        // Generate neo4j-admin import script
96        if self.config.generate_admin_import {
97            self.generate_admin_import_script(output_dir, &node_labels, &relationship_types)?;
98            files.push("admin_import.sh".to_string());
99        }
100
101        // Create metadata
102        let metadata = Neo4jMetadata {
103            name: graph.name.clone(),
104            num_nodes: graph.node_count(),
105            num_edges: graph.edge_count(),
106            node_labels,
107            relationship_types,
108            files,
109        };
110
111        // Write metadata
112        let metadata_path = output_dir.join("metadata.json");
113        let file = File::create(metadata_path)?;
114        serde_json::to_writer_pretty(file, &metadata)?;
115
116        Ok(metadata)
117    }
118
119    /// Exports nodes grouped by type.
120    fn export_nodes(
121        &self,
122        graph: &Graph,
123        output_dir: &Path,
124        files: &mut Vec<String>,
125    ) -> std::io::Result<Vec<String>> {
126        let mut labels = Vec::new();
127
128        for (node_type, node_ids) in &graph.nodes_by_type {
129            let label = node_type.as_str();
130            labels.push(label.to_string());
131
132            let filename = format!("nodes_{}.csv", label.to_lowercase());
133            let path = output_dir.join(&filename);
134            files.push(filename);
135
136            let file = File::create(path)?;
137            let mut writer = BufWriter::with_capacity(256 * 1024, file);
138
139            // Determine properties from first node
140            let sample_node = node_ids.first().and_then(|id| graph.nodes.get(id));
141
142            // Write header
143            let mut header = vec![
144                "nodeId:ID".to_string(),
145                "code".to_string(),
146                "name".to_string(),
147            ];
148
149            if self.config.export_node_properties {
150                if let Some(node) = sample_node {
151                    for key in node.properties.keys() {
152                        header.push(key.clone());
153                    }
154                }
155            }
156
157            if self.config.export_features {
158                if let Some(node) = sample_node {
159                    for i in 0..node.features.len() {
160                        header.push(format!("feature_{i}"));
161                    }
162                }
163            }
164
165            header.push("isAnomaly:boolean".to_string());
166            header.push(":LABEL".to_string());
167
168            writeln!(writer, "{}", header.join(","))?;
169
170            // Write nodes
171            for &node_id in node_ids {
172                if let Some(node) = graph.nodes.get(&node_id) {
173                    let mut row = vec![
174                        node_id.to_string(),
175                        escape_csv(&node.external_id),
176                        escape_csv(&node.label),
177                    ];
178
179                    if self.config.export_node_properties {
180                        for key in &header[3..] {
181                            if key.starts_with("feature_")
182                                || key == "isAnomaly:boolean"
183                                || key == ":LABEL"
184                            {
185                                break;
186                            }
187                            let value = node
188                                .properties
189                                .get(key)
190                                .map(|p| p.to_string_value())
191                                .unwrap_or_default();
192                            row.push(escape_csv(&value));
193                        }
194                    }
195
196                    if self.config.export_features {
197                        for &feat in &node.features {
198                            row.push(format!("{feat:.6}"));
199                        }
200                    }
201
202                    row.push(node.is_anomaly.to_string());
203                    row.push(label.to_string());
204
205                    writeln!(writer, "{}", row.join(","))?;
206                }
207            }
208        }
209
210        Ok(labels)
211    }
212
213    /// Exports edges grouped by type.
214    fn export_edges(
215        &self,
216        graph: &Graph,
217        output_dir: &Path,
218        files: &mut Vec<String>,
219    ) -> std::io::Result<Vec<String>> {
220        let mut rel_types = Vec::new();
221
222        for (edge_type, edge_ids) in &graph.edges_by_type {
223            let rel_type = edge_type.as_str();
224            rel_types.push(rel_type.to_string());
225
226            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
227            let path = output_dir.join(&filename);
228            files.push(filename);
229
230            let file = File::create(path)?;
231            let mut writer = BufWriter::with_capacity(256 * 1024, file);
232
233            // Determine properties from first edge
234            let sample_edge = edge_ids.first().and_then(|id| graph.edges.get(id));
235
236            // Write header
237            let mut header = vec![
238                ":START_ID".to_string(),
239                ":END_ID".to_string(),
240                "weight:double".to_string(),
241            ];
242
243            if self.config.export_edge_properties {
244                if let Some(edge) = sample_edge {
245                    for key in edge.properties.keys() {
246                        header.push(format!("{key}:string"));
247                    }
248                }
249            }
250
251            if self.config.export_features {
252                if let Some(edge) = sample_edge {
253                    for i in 0..edge.features.len() {
254                        header.push(format!("feature_{i}:double"));
255                    }
256                }
257            }
258
259            header.push("isAnomaly:boolean".to_string());
260            header.push(":TYPE".to_string());
261
262            writeln!(writer, "{}", header.join(","))?;
263
264            // Write edges
265            for &edge_id in edge_ids {
266                if let Some(edge) = graph.edges.get(&edge_id) {
267                    let mut row = vec![
268                        edge.source.to_string(),
269                        edge.target.to_string(),
270                        format!("{:.6}", edge.weight),
271                    ];
272
273                    if self.config.export_edge_properties {
274                        for (key, value) in &edge.properties {
275                            if !header.iter().any(|h| h.starts_with(key)) {
276                                continue;
277                            }
278                            row.push(escape_csv(&value.to_string_value()));
279                        }
280                    }
281
282                    if self.config.export_features {
283                        for &feat in &edge.features {
284                            row.push(format!("{feat:.6}"));
285                        }
286                    }
287
288                    row.push(edge.is_anomaly.to_string());
289                    row.push(rel_type.to_string());
290
291                    writeln!(writer, "{}", row.join(","))?;
292                }
293            }
294        }
295
296        Ok(rel_types)
297    }
298
299    /// Generates Cypher import script.
300    fn generate_cypher_script(
301        &self,
302        graph: &Graph,
303        output_dir: &Path,
304        node_labels: &[String],
305        relationship_types: &[String],
306    ) -> std::io::Result<()> {
307        let path = output_dir.join("import.cypher");
308        let file = File::create(path)?;
309        let mut writer = BufWriter::with_capacity(256 * 1024, file);
310
311        writeln!(writer, "// Neo4j Import Script for {}", graph.name)?;
312        writeln!(writer, "// Generated by synth-graph")?;
313        writeln!(writer)?;
314
315        // Create constraints
316        writeln!(writer, "// Create constraints and indexes")?;
317        for label in node_labels {
318            writeln!(
319                writer,
320                "CREATE CONSTRAINT IF NOT EXISTS FOR (n:{label}) REQUIRE n.nodeId IS UNIQUE;"
321            )?;
322        }
323        writeln!(writer)?;
324
325        // Import nodes using LOAD CSV
326        writeln!(writer, "// Import nodes")?;
327        for label in node_labels {
328            let filename = format!("nodes_{}.csv", label.to_lowercase());
329            writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{filename}'")?;
330            writeln!(writer, "AS row")?;
331            writeln!(
332                writer,
333                "CREATE (n:{label} {{nodeId: toInteger(row.`nodeId:ID`), code: row.code, name: row.name, isAnomaly: toBoolean(row.`isAnomaly:boolean`)}});"
334            )?;
335            writeln!(writer)?;
336        }
337
338        // Import edges using LOAD CSV
339        writeln!(writer, "// Import relationships")?;
340        for rel_type in relationship_types {
341            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
342            writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{filename}'")?;
343            writeln!(writer, "AS row")?;
344            writeln!(
345                writer,
346                "MATCH (source) WHERE source.nodeId = toInteger(row.`:START_ID`)"
347            )?;
348            writeln!(
349                writer,
350                "MATCH (target) WHERE target.nodeId = toInteger(row.`:END_ID`)"
351            )?;
352            writeln!(
353                writer,
354                "CREATE (source)-[:{}{{weight: toFloat(row.`weight:double`), isAnomaly: toBoolean(row.`isAnomaly:boolean`)}}]->(target);",
355                rel_type.to_uppercase().replace("-", "_")
356            )?;
357            writeln!(writer)?;
358        }
359
360        // Summary query
361        writeln!(writer, "// Verification query")?;
362        writeln!(writer, "CALL db.labels() YIELD label")?;
363        writeln!(
364            writer,
365            "CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {{}})"
366        )?;
367        writeln!(writer, "YIELD value")?;
368        writeln!(writer, "RETURN label, value.count as nodeCount;")?;
369
370        Ok(())
371    }
372
373    /// Generates neo4j-admin import script.
374    fn generate_admin_import_script(
375        &self,
376        output_dir: &Path,
377        node_labels: &[String],
378        relationship_types: &[String],
379    ) -> std::io::Result<()> {
380        let path = output_dir.join("admin_import.sh");
381        let file = File::create(path)?;
382        let mut writer = BufWriter::with_capacity(256 * 1024, file);
383
384        writeln!(writer, "#!/bin/bash")?;
385        writeln!(writer, "# Neo4j Admin Import Script")?;
386        writeln!(writer, "# Generated by synth-graph")?;
387        writeln!(writer)?;
388        writeln!(writer, "# Set Neo4j home directory")?;
389        writeln!(writer, "NEO4J_HOME=${{NEO4J_HOME:-/var/lib/neo4j}}")?;
390        writeln!(writer, "DATA_DIR=${{DATA_DIR:-$(dirname $0)}}")?;
391        writeln!(writer)?;
392        writeln!(writer, "# Stop Neo4j if running")?;
393        writeln!(writer, "# systemctl stop neo4j")?;
394        writeln!(writer)?;
395        writeln!(writer, "# Run import")?;
396        writeln!(writer, "neo4j-admin database import full \\")?;
397        writeln!(writer, "  --overwrite-destination=true \\")?;
398        writeln!(writer, "  --database={} \\", self.config.database_name)?;
399
400        // Add node files
401        for label in node_labels {
402            let filename = format!("nodes_{}.csv", label.to_lowercase());
403            writeln!(writer, "  --nodes={label}=$DATA_DIR/{filename} \\")?;
404        }
405
406        // Add relationship files
407        for rel_type in relationship_types {
408            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
409            writeln!(
410                writer,
411                "  --relationships={}=$DATA_DIR/{} \\",
412                rel_type.to_uppercase().replace("-", "_"),
413                filename
414            )?;
415        }
416
417        writeln!(writer, "  --skip-bad-relationships=true")?;
418        writeln!(writer)?;
419        writeln!(writer, "echo \"Import complete\"")?;
420        writeln!(writer)?;
421        writeln!(writer, "# Start Neo4j")?;
422        writeln!(writer, "# systemctl start neo4j")?;
423
424        Ok(())
425    }
426}
427
428/// Escapes a value for CSV format.
429fn escape_csv(value: &str) -> String {
430    if value.contains(',') || value.contains('"') || value.contains('\n') {
431        format!("\"{}\"", value.replace('"', "\"\""))
432    } else {
433        value.to_string()
434    }
435}
436
437/// Builder for Cypher queries.
438pub struct CypherQueryBuilder {
439    queries: Vec<String>,
440}
441
442impl CypherQueryBuilder {
443    /// Creates a new query builder.
444    pub fn new() -> Self {
445        Self {
446            queries: Vec::new(),
447        }
448    }
449
450    /// Adds a node creation query.
451    pub fn create_node(&mut self, label: &str, properties: &HashMap<String, String>) -> &mut Self {
452        let props: Vec<String> = properties
453            .iter()
454            .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
455            .collect();
456
457        self.queries
458            .push(format!("CREATE (:{} {{{}}})", label, props.join(", ")));
459        self
460    }
461
462    /// Adds a relationship creation query.
463    pub fn create_relationship(
464        &mut self,
465        from_label: &str,
466        from_id: &str,
467        to_label: &str,
468        to_id: &str,
469        rel_type: &str,
470        properties: &HashMap<String, String>,
471    ) -> &mut Self {
472        let props: Vec<String> = properties
473            .iter()
474            .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
475            .collect();
476
477        let props_str = if props.is_empty() {
478            String::new()
479        } else {
480            format!(" {{{}}}", props.join(", "))
481        };
482
483        self.queries.push(format!(
484            "MATCH (a:{from_label} {{nodeId: '{from_id}'}}), (b:{to_label} {{nodeId: '{to_id}'}}) CREATE (a)-[:{rel_type}{props_str}]->(b)"
485        ));
486        self
487    }
488
489    /// Builds the final Cypher script.
490    pub fn build(&self) -> String {
491        self.queries.join(";\n") + ";"
492    }
493}
494
495impl Default for CypherQueryBuilder {
496    fn default() -> Self {
497        Self::new()
498    }
499}
500
501#[cfg(test)]
502mod tests {
503    use super::*;
504    use crate::test_helpers::create_test_graph;
505    use tempfile::tempdir;
506
507    #[test]
508    fn test_neo4j_export() {
509        let graph = create_test_graph();
510        let dir = tempdir().unwrap();
511
512        let exporter = Neo4jExporter::new(Neo4jExportConfig::default());
513        let metadata = exporter.export(&graph, dir.path()).unwrap();
514
515        assert_eq!(metadata.num_nodes, 2);
516        assert_eq!(metadata.num_edges, 1);
517        assert!(dir.path().join("nodes_account.csv").exists());
518        assert!(dir.path().join("edges_transaction.csv").exists());
519        assert!(dir.path().join("import.cypher").exists());
520        assert!(dir.path().join("admin_import.sh").exists());
521    }
522
523    #[test]
524    fn test_csv_escape() {
525        assert_eq!(escape_csv("simple"), "simple");
526        assert_eq!(escape_csv("with,comma"), "\"with,comma\"");
527        assert_eq!(escape_csv("with\"quote"), "\"with\"\"quote\"");
528    }
529
530    #[test]
531    fn test_cypher_builder() {
532        let mut builder = CypherQueryBuilder::new();
533        let mut props = HashMap::new();
534        props.insert("name".to_string(), "Test".to_string());
535
536        builder.create_node("Account", &props);
537        let cypher = builder.build();
538
539        assert!(cypher.contains("CREATE (:Account"));
540        assert!(cypher.contains("name: 'Test'"));
541    }
542}