Skip to main content

datasynth_graph/exporters/
neo4j.rs

1//! Neo4j exporter.
2//!
3//! Exports graph data in formats compatible with Neo4j import:
4//! - CSV files for nodes and edges (neo4j-admin import format)
5//! - Cypher script for direct loading
6
7use std::collections::HashMap;
8use std::fs::{self, File};
9use std::io::{BufWriter, Write};
10use std::path::Path;
11
12use serde::{Deserialize, Serialize};
13
14use crate::models::Graph;
15
16/// Configuration for Neo4j export.
17#[derive(Debug, Clone)]
18pub struct Neo4jExportConfig {
19    /// Export node properties.
20    pub export_node_properties: bool,
21    /// Export edge properties.
22    pub export_edge_properties: bool,
23    /// Export features as properties.
24    pub export_features: bool,
25    /// Generate Cypher import script.
26    pub generate_cypher: bool,
27    /// Generate neo4j-admin import script.
28    pub generate_admin_import: bool,
29    /// Database name for Cypher.
30    pub database_name: String,
31    /// Batch size for Cypher imports.
32    pub cypher_batch_size: usize,
33}
34
35impl Default for Neo4jExportConfig {
36    fn default() -> Self {
37        Self {
38            export_node_properties: true,
39            export_edge_properties: true,
40            export_features: true,
41            generate_cypher: true,
42            generate_admin_import: true,
43            database_name: "synth".to_string(),
44            cypher_batch_size: 1000,
45        }
46    }
47}
48
49/// Metadata about the exported Neo4j data.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct Neo4jMetadata {
52    /// Graph name.
53    pub name: String,
54    /// Number of nodes.
55    pub num_nodes: usize,
56    /// Number of edges.
57    pub num_edges: usize,
58    /// Node labels (types).
59    pub node_labels: Vec<String>,
60    /// Relationship types.
61    pub relationship_types: Vec<String>,
62    /// Files included in export.
63    pub files: Vec<String>,
64}
65
66/// Neo4j exporter.
67pub struct Neo4jExporter {
68    config: Neo4jExportConfig,
69}
70
71impl Neo4jExporter {
72    /// Creates a new Neo4j exporter.
73    pub fn new(config: Neo4jExportConfig) -> Self {
74        Self { config }
75    }
76
77    /// Exports a graph to Neo4j format.
78    pub fn export(&self, graph: &Graph, output_dir: &Path) -> std::io::Result<Neo4jMetadata> {
79        fs::create_dir_all(output_dir)?;
80
81        let mut files = Vec::new();
82
83        // Export nodes by type
84        let node_labels = self.export_nodes(graph, output_dir, &mut files)?;
85
86        // Export edges by type
87        let relationship_types = self.export_edges(graph, output_dir, &mut files)?;
88
89        // Generate Cypher script
90        if self.config.generate_cypher {
91            self.generate_cypher_script(graph, output_dir, &node_labels, &relationship_types)?;
92            files.push("import.cypher".to_string());
93        }
94
95        // Generate neo4j-admin import script
96        if self.config.generate_admin_import {
97            self.generate_admin_import_script(output_dir, &node_labels, &relationship_types)?;
98            files.push("admin_import.sh".to_string());
99        }
100
101        // Create metadata
102        let metadata = Neo4jMetadata {
103            name: graph.name.clone(),
104            num_nodes: graph.node_count(),
105            num_edges: graph.edge_count(),
106            node_labels,
107            relationship_types,
108            files,
109        };
110
111        // Write metadata
112        let metadata_path = output_dir.join("metadata.json");
113        let file = File::create(metadata_path)?;
114        serde_json::to_writer_pretty(file, &metadata)?;
115
116        Ok(metadata)
117    }
118
119    /// Exports nodes grouped by type.
120    fn export_nodes(
121        &self,
122        graph: &Graph,
123        output_dir: &Path,
124        files: &mut Vec<String>,
125    ) -> std::io::Result<Vec<String>> {
126        let mut labels = Vec::new();
127
128        for (node_type, node_ids) in &graph.nodes_by_type {
129            let label = node_type.as_str();
130            labels.push(label.to_string());
131
132            let filename = format!("nodes_{}.csv", label.to_lowercase());
133            let path = output_dir.join(&filename);
134            files.push(filename);
135
136            let file = File::create(path)?;
137            let mut writer = BufWriter::new(file);
138
139            // Determine properties from first node
140            let sample_node = node_ids.first().and_then(|id| graph.nodes.get(id));
141
142            // Write header
143            let mut header = vec![
144                "nodeId:ID".to_string(),
145                "code".to_string(),
146                "name".to_string(),
147            ];
148
149            if self.config.export_node_properties {
150                if let Some(node) = sample_node {
151                    for key in node.properties.keys() {
152                        header.push(key.clone());
153                    }
154                }
155            }
156
157            if self.config.export_features {
158                if let Some(node) = sample_node {
159                    for i in 0..node.features.len() {
160                        header.push(format!("feature_{}", i));
161                    }
162                }
163            }
164
165            header.push("isAnomaly:boolean".to_string());
166            header.push(":LABEL".to_string());
167
168            writeln!(writer, "{}", header.join(","))?;
169
170            // Write nodes
171            for &node_id in node_ids {
172                if let Some(node) = graph.nodes.get(&node_id) {
173                    let mut row = vec![
174                        node_id.to_string(),
175                        escape_csv(&node.external_id),
176                        escape_csv(&node.label),
177                    ];
178
179                    if self.config.export_node_properties {
180                        for key in &header[3..] {
181                            if key.starts_with("feature_")
182                                || key == "isAnomaly:boolean"
183                                || key == ":LABEL"
184                            {
185                                break;
186                            }
187                            let value = node
188                                .properties
189                                .get(key)
190                                .map(|p| p.to_string_value())
191                                .unwrap_or_default();
192                            row.push(escape_csv(&value));
193                        }
194                    }
195
196                    if self.config.export_features {
197                        for &feat in &node.features {
198                            row.push(format!("{:.6}", feat));
199                        }
200                    }
201
202                    row.push(node.is_anomaly.to_string());
203                    row.push(label.to_string());
204
205                    writeln!(writer, "{}", row.join(","))?;
206                }
207            }
208        }
209
210        Ok(labels)
211    }
212
213    /// Exports edges grouped by type.
214    fn export_edges(
215        &self,
216        graph: &Graph,
217        output_dir: &Path,
218        files: &mut Vec<String>,
219    ) -> std::io::Result<Vec<String>> {
220        let mut rel_types = Vec::new();
221
222        for (edge_type, edge_ids) in &graph.edges_by_type {
223            let rel_type = edge_type.as_str();
224            rel_types.push(rel_type.to_string());
225
226            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
227            let path = output_dir.join(&filename);
228            files.push(filename);
229
230            let file = File::create(path)?;
231            let mut writer = BufWriter::new(file);
232
233            // Determine properties from first edge
234            let sample_edge = edge_ids.first().and_then(|id| graph.edges.get(id));
235
236            // Write header
237            let mut header = vec![
238                ":START_ID".to_string(),
239                ":END_ID".to_string(),
240                "weight:double".to_string(),
241            ];
242
243            if self.config.export_edge_properties {
244                if let Some(edge) = sample_edge {
245                    for key in edge.properties.keys() {
246                        header.push(format!("{}:string", key));
247                    }
248                }
249            }
250
251            if self.config.export_features {
252                if let Some(edge) = sample_edge {
253                    for i in 0..edge.features.len() {
254                        header.push(format!("feature_{}:double", i));
255                    }
256                }
257            }
258
259            header.push("isAnomaly:boolean".to_string());
260            header.push(":TYPE".to_string());
261
262            writeln!(writer, "{}", header.join(","))?;
263
264            // Write edges
265            for &edge_id in edge_ids {
266                if let Some(edge) = graph.edges.get(&edge_id) {
267                    let mut row = vec![
268                        edge.source.to_string(),
269                        edge.target.to_string(),
270                        format!("{:.6}", edge.weight),
271                    ];
272
273                    if self.config.export_edge_properties {
274                        for (key, value) in &edge.properties {
275                            if !header.iter().any(|h| h.starts_with(key)) {
276                                continue;
277                            }
278                            row.push(escape_csv(&value.to_string_value()));
279                        }
280                    }
281
282                    if self.config.export_features {
283                        for &feat in &edge.features {
284                            row.push(format!("{:.6}", feat));
285                        }
286                    }
287
288                    row.push(edge.is_anomaly.to_string());
289                    row.push(rel_type.to_string());
290
291                    writeln!(writer, "{}", row.join(","))?;
292                }
293            }
294        }
295
296        Ok(rel_types)
297    }
298
299    /// Generates Cypher import script.
300    fn generate_cypher_script(
301        &self,
302        graph: &Graph,
303        output_dir: &Path,
304        node_labels: &[String],
305        relationship_types: &[String],
306    ) -> std::io::Result<()> {
307        let path = output_dir.join("import.cypher");
308        let file = File::create(path)?;
309        let mut writer = BufWriter::new(file);
310
311        writeln!(writer, "// Neo4j Import Script for {}", graph.name)?;
312        writeln!(writer, "// Generated by synth-graph")?;
313        writeln!(writer)?;
314
315        // Create constraints
316        writeln!(writer, "// Create constraints and indexes")?;
317        for label in node_labels {
318            writeln!(
319                writer,
320                "CREATE CONSTRAINT IF NOT EXISTS FOR (n:{}) REQUIRE n.nodeId IS UNIQUE;",
321                label
322            )?;
323        }
324        writeln!(writer)?;
325
326        // Import nodes using LOAD CSV
327        writeln!(writer, "// Import nodes")?;
328        for label in node_labels {
329            let filename = format!("nodes_{}.csv", label.to_lowercase());
330            writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{}'", filename)?;
331            writeln!(writer, "AS row")?;
332            writeln!(
333                writer,
334                "CREATE (n:{} {{nodeId: toInteger(row.`nodeId:ID`), code: row.code, name: row.name, isAnomaly: toBoolean(row.`isAnomaly:boolean`)}});",
335                label
336            )?;
337            writeln!(writer)?;
338        }
339
340        // Import edges using LOAD CSV
341        writeln!(writer, "// Import relationships")?;
342        for rel_type in relationship_types {
343            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
344            writeln!(writer, "LOAD CSV WITH HEADERS FROM 'file:///{}'", filename)?;
345            writeln!(writer, "AS row")?;
346            writeln!(
347                writer,
348                "MATCH (source) WHERE source.nodeId = toInteger(row.`:START_ID`)"
349            )?;
350            writeln!(
351                writer,
352                "MATCH (target) WHERE target.nodeId = toInteger(row.`:END_ID`)"
353            )?;
354            writeln!(
355                writer,
356                "CREATE (source)-[:{}{{weight: toFloat(row.`weight:double`), isAnomaly: toBoolean(row.`isAnomaly:boolean`)}}]->(target);",
357                rel_type.to_uppercase().replace("-", "_")
358            )?;
359            writeln!(writer)?;
360        }
361
362        // Summary query
363        writeln!(writer, "// Verification query")?;
364        writeln!(writer, "CALL db.labels() YIELD label")?;
365        writeln!(
366            writer,
367            "CALL apoc.cypher.run('MATCH (n:`' + label + '`) RETURN count(n) as count', {{}})"
368        )?;
369        writeln!(writer, "YIELD value")?;
370        writeln!(writer, "RETURN label, value.count as nodeCount;")?;
371
372        Ok(())
373    }
374
375    /// Generates neo4j-admin import script.
376    fn generate_admin_import_script(
377        &self,
378        output_dir: &Path,
379        node_labels: &[String],
380        relationship_types: &[String],
381    ) -> std::io::Result<()> {
382        let path = output_dir.join("admin_import.sh");
383        let file = File::create(path)?;
384        let mut writer = BufWriter::new(file);
385
386        writeln!(writer, "#!/bin/bash")?;
387        writeln!(writer, "# Neo4j Admin Import Script")?;
388        writeln!(writer, "# Generated by synth-graph")?;
389        writeln!(writer)?;
390        writeln!(writer, "# Set Neo4j home directory")?;
391        writeln!(writer, "NEO4J_HOME=${{NEO4J_HOME:-/var/lib/neo4j}}")?;
392        writeln!(writer, "DATA_DIR=${{DATA_DIR:-$(dirname $0)}}")?;
393        writeln!(writer)?;
394        writeln!(writer, "# Stop Neo4j if running")?;
395        writeln!(writer, "# systemctl stop neo4j")?;
396        writeln!(writer)?;
397        writeln!(writer, "# Run import")?;
398        writeln!(writer, "neo4j-admin database import full \\")?;
399        writeln!(writer, "  --overwrite-destination=true \\")?;
400        writeln!(writer, "  --database={} \\", self.config.database_name)?;
401
402        // Add node files
403        for label in node_labels {
404            let filename = format!("nodes_{}.csv", label.to_lowercase());
405            writeln!(writer, "  --nodes={}=$DATA_DIR/{} \\", label, filename)?;
406        }
407
408        // Add relationship files
409        for rel_type in relationship_types {
410            let filename = format!("edges_{}.csv", rel_type.to_lowercase());
411            writeln!(
412                writer,
413                "  --relationships={}=$DATA_DIR/{} \\",
414                rel_type.to_uppercase().replace("-", "_"),
415                filename
416            )?;
417        }
418
419        writeln!(writer, "  --skip-bad-relationships=true")?;
420        writeln!(writer)?;
421        writeln!(writer, "echo \"Import complete\"")?;
422        writeln!(writer)?;
423        writeln!(writer, "# Start Neo4j")?;
424        writeln!(writer, "# systemctl start neo4j")?;
425
426        Ok(())
427    }
428}
429
430/// Escapes a value for CSV format.
431fn escape_csv(value: &str) -> String {
432    if value.contains(',') || value.contains('"') || value.contains('\n') {
433        format!("\"{}\"", value.replace('"', "\"\""))
434    } else {
435        value.to_string()
436    }
437}
438
439/// Builder for Cypher queries.
440pub struct CypherQueryBuilder {
441    queries: Vec<String>,
442}
443
444impl CypherQueryBuilder {
445    /// Creates a new query builder.
446    pub fn new() -> Self {
447        Self {
448            queries: Vec::new(),
449        }
450    }
451
452    /// Adds a node creation query.
453    pub fn create_node(&mut self, label: &str, properties: &HashMap<String, String>) -> &mut Self {
454        let props: Vec<String> = properties
455            .iter()
456            .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
457            .collect();
458
459        self.queries
460            .push(format!("CREATE (:{} {{{}}})", label, props.join(", ")));
461        self
462    }
463
464    /// Adds a relationship creation query.
465    pub fn create_relationship(
466        &mut self,
467        from_label: &str,
468        from_id: &str,
469        to_label: &str,
470        to_id: &str,
471        rel_type: &str,
472        properties: &HashMap<String, String>,
473    ) -> &mut Self {
474        let props: Vec<String> = properties
475            .iter()
476            .map(|(k, v)| format!("{}: '{}'", k, v.replace('\'', "\\'")))
477            .collect();
478
479        let props_str = if props.is_empty() {
480            String::new()
481        } else {
482            format!(" {{{}}}", props.join(", "))
483        };
484
485        self.queries.push(format!(
486            "MATCH (a:{} {{nodeId: '{}'}}), (b:{} {{nodeId: '{}'}}) CREATE (a)-[:{}{}]->(b)",
487            from_label, from_id, to_label, to_id, rel_type, props_str
488        ));
489        self
490    }
491
492    /// Builds the final Cypher script.
493    pub fn build(&self) -> String {
494        self.queries.join(";\n") + ";"
495    }
496}
497
498impl Default for CypherQueryBuilder {
499    fn default() -> Self {
500        Self::new()
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507    use crate::models::{EdgeType, GraphEdge, GraphNode, GraphType, NodeType};
508    use tempfile::tempdir;
509
510    fn create_test_graph() -> Graph {
511        let mut graph = Graph::new("test", GraphType::Transaction);
512
513        let n1 = graph.add_node(
514            GraphNode::new(0, NodeType::Account, "1000".to_string(), "Cash".to_string())
515                .with_feature(0.5),
516        );
517        let n2 = graph.add_node(
518            GraphNode::new(0, NodeType::Account, "2000".to_string(), "AP".to_string())
519                .with_feature(0.8),
520        );
521
522        graph.add_edge(
523            GraphEdge::new(0, n1, n2, EdgeType::Transaction)
524                .with_weight(1000.0)
525                .with_feature(6.9),
526        );
527
528        graph.compute_statistics();
529        graph
530    }
531
532    #[test]
533    fn test_neo4j_export() {
534        let graph = create_test_graph();
535        let dir = tempdir().unwrap();
536
537        let exporter = Neo4jExporter::new(Neo4jExportConfig::default());
538        let metadata = exporter.export(&graph, dir.path()).unwrap();
539
540        assert_eq!(metadata.num_nodes, 2);
541        assert_eq!(metadata.num_edges, 1);
542        assert!(dir.path().join("nodes_account.csv").exists());
543        assert!(dir.path().join("edges_transaction.csv").exists());
544        assert!(dir.path().join("import.cypher").exists());
545        assert!(dir.path().join("admin_import.sh").exists());
546    }
547
548    #[test]
549    fn test_csv_escape() {
550        assert_eq!(escape_csv("simple"), "simple");
551        assert_eq!(escape_csv("with,comma"), "\"with,comma\"");
552        assert_eq!(escape_csv("with\"quote"), "\"with\"\"quote\"");
553    }
554
555    #[test]
556    fn test_cypher_builder() {
557        let mut builder = CypherQueryBuilder::new();
558        let mut props = HashMap::new();
559        props.insert("name".to_string(), "Test".to_string());
560
561        builder.create_node("Account", &props);
562        let cypher = builder.build();
563
564        assert!(cypher.contains("CREATE (:Account"));
565        assert!(cypher.contains("name: 'Test'"));
566    }
567}