Skip to main content

infigraph_core/graph/
store_bench.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use arrow::array::{Int64Array, StringArray};
6use arrow::datatypes::DataType;
7
8use super::parquet_loader;
9use super::store::GraphStore;
10use super::store_util::{escape, fwd_slash_path};
11
12impl GraphStore {
13    /// Test: DELETE + COPY FROM parquet produces identical data to MERGE/UNWIND.
14    /// Covers edge cases: <>, quotes, unicode, empty strings, backslashes, newlines.
15    pub fn test_parquet_quality(&self) -> Result<()> {
16        let conn = self.connection()?;
17
18        let full_schema = "CREATE NODE TABLE %TABLE%(id STRING, name STRING, kind STRING, file STRING, start_line INT64, end_line INT64, signature_hash STRING, language STRING, visibility STRING, parent STRING, docstring STRING, complexity INT64, PRIMARY KEY(id))";
19
20        // Edge case test data -- every known problematic pattern
21        let long_doc = "A".repeat(10000);
22        #[allow(clippy::type_complexity)]
23        let test_rows: Vec<(&str, &str, &str, &str, i64, i64, &str, &str, &str, &str, &str, i64)> = vec![
24            ("t1", "normal_func", "Function", "src/main.rs", 1, 10, "abc", "rust", "public", "", "Normal docstring", 3),
25            ("t2", "angle_brackets", "Function", "src/lib.rs", 5, 20, "def", "java", "", "", "Returns List<String> from <code>parse</code>", 1),
26            ("t3", "flask_route", "Function", "app.py", 2, 8, "ghi", "python", "public", "", "@app.route(\"/api/users/<int:id>\", methods=[\"GET\"])", 2),
27            ("t4", "regex_group", "Function", "src/re.py", 10, 50, "jkl", "python", "", "", "(?P<query>.+)/$", 5),
28            ("t5", "html_javadoc", "Method", "Foo.java", 3, 15, "mno", "java", "public", "Foo", "/** Wraps <p>text</p> in {@link List<T>} */", 4),
29            ("t6", "double_quotes", "Function", "bar.rs", 1, 5, "pqr", "rust", "", "", "Returns \"hello world\" and \"goodbye\"", 1),
30            ("t7", "single_quotes", "Function", "baz.py", 1, 5, "stu", "python", "", "", "It's a test with 'single' quotes", 1),
31            ("t8", "backslashes", "Function", "esc.rs", 1, 5, "vwx", "rust", "", "", "Path is C:\\Users\\test\\file.txt", 1),
32            ("t09", "unicode", "Class", "uni.py", 1, 5, "yza", "python", "", "Parent", "Ünïcödé: 日本語テスト 🚀", 0),
33            ("t10", "empty_all", "Variable", "e.rs", 0, 0, "", "", "", "", "", 0),
34            ("t11", "tab_content", "Function", "tab.rs", 1, 5, "tab", "rust", "", "", "col1\tcol2\tcol3", 1),
35            ("t12", "newline_content", "Function", "nl.rs", 1, 5, "nln", "rust", "", "", "line1\nline2\nline3", 1),
36            ("t13", "mixed_evil", "Function", "evil.java", 1, 99, "evil", "java", "public", "", "/** @param <T extends Comparable<? super T>> \\n uses 'single' and \"double\" */", 9),
37            // Real-world: Java Javadoc with HTML tags (tto-engine pattern, 332 mismatches)
38            ("t14", "javadoc_html", "Class", "Util.java", 1, 200, "jdoc", "java", "public", "", "/** Perl's split function and <b>s</b> operation inspired. Uses {@link #substitute substitute()} */", 3),
39            ("t15", "javadoc_code", "Method", "StreamSearcher.java", 1, 50, "jcod", "java", "public", "", "/**  * performs a function similar to the Unix <code>strings</code> command */", 2),
40            ("t16", "javadoc_p_tag", "Method", "GlobFilenameFilter.java", 1, 30, "jpag", "java", "public", "", "/**    * Filters a filename.    * <p>    * @param dir  The directory.    * @return True if match.    */", 1),
41            ("t17", "javadoc_link_generic", "Method", "PatternCache.java", 1, 60, "jlnk", "java", "public", "", "/**    * Returns a {@link PatternCache<T>} instance.    * <p>    * Uses {@link #getPattern getPattern()} internally.    */", 4),
42            // Real-world: Ruby paths with backslashes (WTax pattern)
43            ("t18", "ruby_backslash_path", "Constant", "consts.rb", 1, 5, "rbsp", "ruby", "", "", "Update allows: <anyBasefolderStructureDesired>\\Protax\\LacerteTax\\...", 0),
44            ("t19", "ruby_interpolation", "Constant", "consts.rb", 2, 5, "rbin", "ruby", "", "", "lacerte\\#{YEAR_YY}tax\\\\ + NETBRANCH + \\\\Loader\\\\CDROMWIN\\\\", 0),
45            // Real-world: VB6 comments (EasyAcct pattern)
46            ("t20", "vb6_comment", "Function", "ad911cal.bas", 1, 20, "vb6c", "basic", "", "", "'---PDB 04/02/02 verify if asset complies with sept 11 01 30% rules", 1),
47            ("t21", "vb6_include", "Variable", "ad911cal.bas", 3, 3, "vb6i", "basic", "", "", "'$INCLUDE: 'EZDIMCOM.INC'", 0),
48            // Real-world: C# XML doc comments (federal pattern)
49            ("t22", "csharp_xmldoc", "Method", "TaxCalc.cs", 1, 15, "csxd", "csharp", "public", "TaxCalc", "/// <summary>Calculates <see cref=\"TaxResult\"/> for given <paramref name=\"input\"/></summary>", 2),
50            ("t23", "csharp_generic", "Class", "Repository.cs", 1, 100, "csgn", "csharp", "public", "", "/// <typeparam name=\"T\">Must implement <see cref=\"IEntity{T}\"/></typeparam>", 5),
51            // SQL injection-style content
52            ("t24", "sql_in_doc", "Function", "db.py", 1, 10, "sqli", "python", "", "", "Runs: SELECT * FROM users WHERE name = 'O\\'Brien' AND id > 0; -- drop table", 1),
53            // Markdown in docstrings
54            ("t25", "markdown_doc", "Function", "lib.rs", 1, 20, "mkdn", "rust", "public", "", "# Header\n\n```rust\nfn main() { println!(\"hello\"); }\n```\n\n- item `<T>`\n- [link](http://example.com?a=1&b=2)", 3),
55            // JSON in docstrings
56            ("t26", "json_doc", "Function", "api.py", 1, 10, "json", "python", "", "", "Returns {\"key\": \"value\", \"list\": [1, 2, 3], \"nested\": {\"a\": true}}", 1),
57            // XML/HTML entities
58            ("t27", "entity_doc", "Function", "parser.rs", 1, 10, "enty", "rust", "", "", "Handles &amp; &lt; &gt; &quot; &#39; entities plus raw < > & \" '", 2),
59            // Very long docstring (stress test)
60            ("t28", "long_doc", "Function", "big.java", 1, 500, "long", "java", "public", "", &long_doc, 99),
61            // Null bytes and control characters
62            ("t29", "control_chars", "Function", "ctrl.rs", 1, 5, "ctrl", "rust", "", "", "has \x01 \x02 \x03 control chars and \x7f DEL", 1),
63            // Windows CRLF
64            ("t30", "crlf_doc", "Function", "win.cs", 1, 5, "crlf", "csharp", "", "", "line1\r\nline2\r\nline3", 1),
65            // Deeply nested generics (Java/C#)
66            ("t31", "nested_generics", "Method", "Deep.java", 1, 10, "deep", "java", "public", "", "Map<String, List<Pair<Integer, Consumer<? super T>>>> process()", 8),
67            // Percent and special URL chars
68            ("t32", "url_doc", "Function", "http.py", 1, 5, "urls", "python", "", "", "GET /api/v1/users?name=John%20Doe&age=30#section HTTP/1.1", 1),
69            // Pipe chars (can confuse some parsers)
70            ("t33", "pipe_doc", "Function", "sh.rs", 1, 5, "pipe", "rust", "", "", "cat file.txt | grep 'pattern' | awk '{print $1}' | sort -u", 1),
71            // Regex with all special chars
72            ("t34", "regex_full", "Function", "re.py", 1, 5, "regx", "python", "", "", "^(?:https?://)?(?:www\\.)?([^/?#]+)(?:[/?#]|$)", 3),
73            // Triple quotes and mixed quotes
74            ("t35", "triple_quote", "Function", "doc.py", 1, 5, "trpl", "python", "", "", "\"\"\"This is a '''triple quoted''' \"docstring\" with 'mixed' quotes\"\"\"", 1),
75        ];
76
77        println!(
78            "=== Parquet Quality Test ({} edge cases) ===\n",
79            test_rows.len()
80        );
81
82        // === Method A: Direct parquet COPY FROM (proposed new path) ===
83        let _ = conn.query("DROP TABLE IF EXISTS QualParquet");
84        conn.query(&full_schema.replace("%TABLE%", "QualParquet"))?;
85
86        let pq_path = std::env::temp_dir().join("quality_test.parquet");
87        {
88            let ids: Vec<&str> = test_rows.iter().map(|r| r.0).collect();
89            let names: Vec<&str> = test_rows.iter().map(|r| r.1).collect();
90            let kinds: Vec<&str> = test_rows.iter().map(|r| r.2).collect();
91            let files: Vec<&str> = test_rows.iter().map(|r| r.3).collect();
92            let sls: Vec<i64> = test_rows.iter().map(|r| r.4).collect();
93            let els: Vec<i64> = test_rows.iter().map(|r| r.5).collect();
94            let sigs: Vec<&str> = test_rows.iter().map(|r| r.6).collect();
95            let langs: Vec<&str> = test_rows.iter().map(|r| r.7).collect();
96            let viss: Vec<&str> = test_rows.iter().map(|r| r.8).collect();
97            let pars: Vec<&str> = test_rows.iter().map(|r| r.9).collect();
98            let docs: Vec<&str> = test_rows.iter().map(|r| r.10).collect();
99            let comps: Vec<i64> = test_rows.iter().map(|r| r.11).collect();
100
101            parquet_loader::write_node_parquet(
102                &pq_path,
103                &[
104                    ("id", DataType::Utf8),
105                    ("name", DataType::Utf8),
106                    ("kind", DataType::Utf8),
107                    ("file", DataType::Utf8),
108                    ("start_line", DataType::Int64),
109                    ("end_line", DataType::Int64),
110                    ("signature_hash", DataType::Utf8),
111                    ("language", DataType::Utf8),
112                    ("visibility", DataType::Utf8),
113                    ("parent", DataType::Utf8),
114                    ("docstring", DataType::Utf8),
115                    ("complexity", DataType::Int64),
116                ],
117                vec![
118                    Arc::new(StringArray::from(ids)),
119                    Arc::new(StringArray::from(names)),
120                    Arc::new(StringArray::from(kinds)),
121                    Arc::new(StringArray::from(files)),
122                    Arc::new(Int64Array::from(sls)),
123                    Arc::new(Int64Array::from(els)),
124                    Arc::new(StringArray::from(sigs)),
125                    Arc::new(StringArray::from(langs)),
126                    Arc::new(StringArray::from(viss)),
127                    Arc::new(StringArray::from(pars)),
128                    Arc::new(StringArray::from(docs)),
129                    Arc::new(Int64Array::from(comps)),
130                ],
131            )?;
132        }
133        conn.query(&format!("COPY QualParquet (id, name, kind, file, start_line, end_line, signature_hash, language, visibility, parent, docstring, complexity) FROM '{}'", fwd_slash_path(&pq_path)))?;
134
135        // === Method B: DELETE + COPY FROM parquet (proposed incremental path) ===
136        let _ = conn.query("DROP TABLE IF EXISTS QualDeleteCopy");
137        conn.query(&full_schema.replace("%TABLE%", "QualDeleteCopy"))?;
138
139        // Seed with dummy data first
140        conn.query("CREATE (:QualDeleteCopy {id: 'dummy_1', name: 'old', kind: 'X', file: 'old.rs', start_line: 0, end_line: 0, signature_hash: '', language: '', visibility: '', parent: '', docstring: '', complexity: 0})")?;
141        conn.query("CREATE (:QualDeleteCopy {id: 'dummy_2', name: 'old2', kind: 'X', file: 'old.rs', start_line: 0, end_line: 0, signature_hash: '', language: '', visibility: '', parent: '', docstring: '', complexity: 0})")?;
142
143        // DELETE old rows then COPY FROM parquet
144        conn.query("MATCH (n:QualDeleteCopy) DELETE n")?;
145        conn.query(&format!("COPY QualDeleteCopy (id, name, kind, file, start_line, end_line, signature_hash, language, visibility, parent, docstring, complexity) FROM '{}'", fwd_slash_path(&pq_path)))?;
146
147        // === Read back and compare ===
148        let fields = [
149            "id",
150            "name",
151            "kind",
152            "file",
153            "start_line",
154            "end_line",
155            "signature_hash",
156            "language",
157            "visibility",
158            "parent",
159            "docstring",
160            "complexity",
161        ];
162        let field_list = fields
163            .iter()
164            .map(|f| format!("s.{f}"))
165            .collect::<Vec<_>>()
166            .join(", ");
167
168        let read_all = |table: &str| -> Result<Vec<Vec<String>>> {
169            let r = conn.query(&format!(
170                "MATCH (s:{table}) RETURN {field_list} ORDER BY s.id"
171            ))?;
172            let mut out = Vec::new();
173            for row in r {
174                out.push(row.iter().map(|v| v.to_string()).collect());
175            }
176            Ok(out)
177        };
178
179        let pq_rows = read_all("QualParquet")?;
180        let dc_rows = read_all("QualDeleteCopy")?;
181
182        // Compare Parquet vs DELETE+COPY
183        println!("--- Parquet vs DELETE+COPY ---");
184        let mut pass = 0;
185        let mut fail = 0;
186        for (i, (pr, dr)) in pq_rows.iter().zip(dc_rows.iter()).enumerate() {
187            for (fi, field) in fields.iter().enumerate() {
188                if pr.get(fi) != dr.get(fi) {
189                    println!("  MISMATCH row={i} field={field}:");
190                    println!("    parquet:      {:?}", pr.get(fi));
191                    println!("    delete+copy:  {:?}", dr.get(fi));
192                    fail += 1;
193                } else {
194                    pass += 1;
195                }
196            }
197        }
198        println!("  Result: {} passed, {} failed", pass, fail);
199
200        // Compare Parquet vs expected (ground truth = input test data)
201        // Use ID-based lookup since ORDER BY sorts lexicographically (t10 < t2)
202        println!("\n--- Parquet vs Ground Truth ---");
203        let mut gt_pass = 0;
204        let mut gt_fail = 0;
205        let stored_by_id: HashMap<&str, &Vec<String>> = pq_rows
206            .iter()
207            .filter_map(|r| r.first().map(|id| (id.as_str(), r)))
208            .collect();
209        for row in &test_rows {
210            let expected = vec![
211                row.0.to_string(),
212                row.1.to_string(),
213                row.2.to_string(),
214                row.3.to_string(),
215                row.4.to_string(),
216                row.5.to_string(),
217                row.6.to_string(),
218                row.7.to_string(),
219                row.8.to_string(),
220                row.9.to_string(),
221                row.10.to_string(),
222                row.11.to_string(),
223            ];
224            if let Some(stored) = stored_by_id.get(row.0) {
225                for (fi, field) in fields.iter().enumerate() {
226                    let stored_val = stored.get(fi).map(|s| s.as_str()).unwrap_or("");
227                    let expected_val = &expected[fi];
228                    if stored_val == expected_val {
229                        gt_pass += 1;
230                    } else {
231                        println!("  MISMATCH id={} field={field}:", row.0);
232                        println!("    expected: {:?}", expected_val);
233                        println!("    stored:   {:?}", stored_val);
234                        gt_fail += 1;
235                    }
236                }
237            } else {
238                println!("  MISSING: id={} not found in stored data", row.0);
239                gt_fail += 1;
240            }
241        }
242        println!("  Result: {} passed, {} failed", gt_pass, gt_fail);
243
244        if fail == 0 && gt_fail == 0 {
245            println!("\n=== ALL TESTS PASSED -- zero quality loss ===");
246        } else {
247            println!("\n=== QUALITY ISSUES DETECTED ===");
248        }
249
250        // Cleanup
251        let _ = conn.query("DROP TABLE QualParquet");
252        let _ = conn.query("DROP TABLE QualDeleteCopy");
253        let _ = std::fs::remove_file(&pq_path);
254        Ok(())
255    }
256
257    /// Benchmark: compare COPY FROM CSV vs UNWIND for bulk symbol inserts.
258    /// Creates isolated test tables, measures both approaches, prints results.
259    pub fn benchmark_bulk_write(&self, n: usize) -> Result<()> {
260        let conn = self.connection()?;
261
262        // Setup isolated test tables
263        let _ = conn.query("DROP TABLE IF EXISTS BenchSymbolCopy");
264        let _ = conn.query("DROP TABLE IF EXISTS BenchSymbolUnwind");
265        conn.query("CREATE NODE TABLE BenchSymbolCopy(id STRING, name STRING, kind STRING, file STRING, PRIMARY KEY(id))")?;
266        conn.query("CREATE NODE TABLE BenchSymbolUnwind(id STRING, name STRING, kind STRING, file STRING, PRIMARY KEY(id))")?;
267
268        // --- COPY FROM CSV ---
269        let csv_path = std::env::temp_dir().join("infigraph_bench_symbols.csv");
270        {
271            use std::io::Write;
272            let mut f = std::fs::File::create(&csv_path)?;
273            writeln!(f, "id,name,kind,file")?;
274            for i in 0..n {
275                writeln!(f, "copy_{i},func_{i},Function,bench.rs")?;
276            }
277        }
278        let t0 = std::time::Instant::now();
279        conn.query(&format!(
280            "COPY BenchSymbolCopy FROM '{}' (header=true)",
281            fwd_slash_path(&csv_path)
282        ))?;
283        let copy_ms = t0.elapsed().as_millis();
284
285        // --- UNWIND ---
286        const CHUNK: usize = 2000;
287        let rows: Vec<String> = (0..n)
288            .map(|i| {
289                format!(
290                    "{{id: 'unwind_{i}', name: 'func_{i}', kind: 'Function', file: 'bench.rs'}}"
291                )
292            })
293            .collect();
294        let t1 = std::time::Instant::now();
295        for chunk in rows.chunks(CHUNK) {
296            conn.query(&format!(
297                "UNWIND [{}] AS s CREATE (:BenchSymbolUnwind {{id: s.id, name: s.name, kind: s.kind, file: s.file}})",
298                chunk.join(", ")
299            ))?;
300        }
301        let unwind_ms = t1.elapsed().as_millis();
302
303        println!("Bulk write benchmark ({n} symbols):");
304        println!("  COPY FROM CSV : {}ms", copy_ms);
305        println!("  UNWIND chunks : {}ms", unwind_ms);
306        println!(
307            "  Speedup       : {:.1}x",
308            unwind_ms as f64 / copy_ms.max(1) as f64
309        );
310
311        // Cleanup
312        let _ = conn.query("DROP TABLE BenchSymbolCopy");
313        let _ = conn.query("DROP TABLE BenchSymbolUnwind");
314        let _ = std::fs::remove_file(&csv_path);
315
316        Ok(())
317    }
318
319    /// Benchmark: CSV vs Parquet vs UNWIND -- apple-to-apple with real symbol data.
320    /// Tests performance AND data integrity (docstrings with <, >, quotes, unicode).
321    pub fn benchmark_parquet_vs_csv(&self) -> Result<()> {
322        let conn = self.connection()?;
323
324        let result = conn.query(
325            "MATCH (s:Symbol) RETURN s.id, s.name, s.kind, s.file, s.start_line, s.end_line, s.signature_hash, s.language, s.visibility, s.parent, s.docstring, s.complexity"
326        )?;
327        let mut rows: Vec<Vec<String>> = Vec::new();
328        for row in result {
329            rows.push(row.iter().map(|v| v.to_string()).collect());
330        }
331        let n = rows.len();
332        println!("Loaded {} real symbols from graph", n);
333
334        let full_schema = "CREATE NODE TABLE %TABLE%(id STRING, name STRING, kind STRING, file STRING, start_line INT64, end_line INT64, signature_hash STRING, language STRING, visibility STRING, parent STRING, docstring STRING, complexity INT64, PRIMARY KEY(id))";
335        let fields_list = "id, name, kind, file, start_line, end_line, signature_hash, language, visibility, parent, docstring, complexity";
336
337        // ===== 1. COPY FROM CSV (TSV) =====
338        let _ = conn.query("DROP TABLE IF EXISTS BenchCSV");
339        conn.query(&full_schema.replace("%TABLE%", "BenchCSV"))?;
340
341        let csv_path = std::env::temp_dir().join("infigraph_bench_csv.csv");
342        {
343            use std::io::Write;
344            let mut f = std::fs::File::create(&csv_path)?;
345            writeln!(f, "id\tname\tkind\tfile\tstart_line\tend_line\tsignature_hash\tlanguage\tvisibility\tparent\tdocstring\tcomplexity")?;
346            let tsv_field = |s: &str| -> String { s.replace(['\t', '\n', '\r'], " ") };
347            for row in &rows {
348                writeln!(
349                    f,
350                    "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
351                    tsv_field(&row[0]),
352                    tsv_field(&row[1]),
353                    tsv_field(&row[2]),
354                    tsv_field(&row[3]),
355                    row[4],
356                    row[5],
357                    tsv_field(&row[6]),
358                    tsv_field(&row[7]),
359                    tsv_field(&row[8]),
360                    tsv_field(&row[9]),
361                    tsv_field(&row[10]),
362                    row[11]
363                )?;
364            }
365        }
366        let csv_size = std::fs::metadata(&csv_path).map(|m| m.len()).unwrap_or(0);
367        let t0 = std::time::Instant::now();
368        conn.query(&format!(
369            "COPY BenchCSV FROM '{}' (header=true, delim='\\t')",
370            fwd_slash_path(&csv_path)
371        ))?;
372        let csv_ms = t0.elapsed().as_millis();
373
374        // ===== 2. COPY FROM Parquet =====
375        let _ = conn.query("DROP TABLE IF EXISTS BenchParquet");
376        conn.query(&full_schema.replace("%TABLE%", "BenchParquet"))?;
377
378        let pq_path = std::env::temp_dir().join("infigraph_bench.parquet");
379        {
380            let ids: Vec<&str> = rows.iter().map(|r| r[0].as_str()).collect();
381            let names: Vec<&str> = rows.iter().map(|r| r[1].as_str()).collect();
382            let kinds: Vec<&str> = rows.iter().map(|r| r[2].as_str()).collect();
383            let files: Vec<&str> = rows.iter().map(|r| r[3].as_str()).collect();
384            let start_lines: Vec<i64> = rows.iter().map(|r| r[4].parse().unwrap_or(0)).collect();
385            let end_lines: Vec<i64> = rows.iter().map(|r| r[5].parse().unwrap_or(0)).collect();
386            let sig_hashes: Vec<&str> = rows.iter().map(|r| r[6].as_str()).collect();
387            let languages: Vec<&str> = rows.iter().map(|r| r[7].as_str()).collect();
388            let visibilities: Vec<&str> = rows.iter().map(|r| r[8].as_str()).collect();
389            let parents: Vec<&str> = rows.iter().map(|r| r[9].as_str()).collect();
390            let docstrings: Vec<&str> = rows.iter().map(|r| r[10].as_str()).collect();
391            let complexities: Vec<i64> = rows.iter().map(|r| r[11].parse().unwrap_or(0)).collect();
392
393            parquet_loader::write_node_parquet(
394                &pq_path,
395                &[
396                    ("id", DataType::Utf8),
397                    ("name", DataType::Utf8),
398                    ("kind", DataType::Utf8),
399                    ("file", DataType::Utf8),
400                    ("start_line", DataType::Int64),
401                    ("end_line", DataType::Int64),
402                    ("signature_hash", DataType::Utf8),
403                    ("language", DataType::Utf8),
404                    ("visibility", DataType::Utf8),
405                    ("parent", DataType::Utf8),
406                    ("docstring", DataType::Utf8),
407                    ("complexity", DataType::Int64),
408                ],
409                vec![
410                    Arc::new(StringArray::from(ids)),
411                    Arc::new(StringArray::from(names)),
412                    Arc::new(StringArray::from(kinds)),
413                    Arc::new(StringArray::from(files)),
414                    Arc::new(Int64Array::from(start_lines)),
415                    Arc::new(Int64Array::from(end_lines)),
416                    Arc::new(StringArray::from(sig_hashes)),
417                    Arc::new(StringArray::from(languages)),
418                    Arc::new(StringArray::from(visibilities)),
419                    Arc::new(StringArray::from(parents)),
420                    Arc::new(StringArray::from(docstrings)),
421                    Arc::new(Int64Array::from(complexities)),
422                ],
423            )?;
424        }
425        let pq_size = std::fs::metadata(&pq_path).map(|m| m.len()).unwrap_or(0);
426        let t1 = std::time::Instant::now();
427        conn.query(&format!(
428            "COPY BenchParquet ({fields_list}) FROM '{}'",
429            fwd_slash_path(&pq_path)
430        ))?;
431        let pq_ms = t1.elapsed().as_millis();
432
433        // ===== 3. UNWIND =====
434        let _ = conn.query("DROP TABLE IF EXISTS BenchUnwind");
435        conn.query(&full_schema.replace("%TABLE%", "BenchUnwind"))?;
436
437        const CHUNK: usize = 2000;
438        let unwind_rows: Vec<String> = rows.iter().map(|row| {
439            format!("{{id: '{}', name: '{}', kind: '{}', file: '{}', start_line: {}, end_line: {}, signature_hash: '{}', language: '{}', visibility: '{}', parent: '{}', docstring: '{}', complexity: {}}}",
440                escape(&row[0]), escape(&row[1]), escape(&row[2]), escape(&row[3]),
441                row[4], row[5],
442                escape(&row[6]), escape(&row[7]), escape(&row[8]),
443                escape(&row[9]), escape(&row[10]), row[11])
444        }).collect();
445        let t2 = std::time::Instant::now();
446        for chunk in unwind_rows.chunks(CHUNK) {
447            conn.query(&format!(
448                "UNWIND [{}] AS s CREATE (:BenchUnwind {{id: s.id, name: s.name, kind: s.kind, file: s.file, start_line: s.start_line, end_line: s.end_line, signature_hash: s.signature_hash, language: s.language, visibility: s.visibility, parent: s.parent, docstring: s.docstring, complexity: s.complexity}})",
449                chunk.join(", ")
450            ))?;
451        }
452        let unwind_ms = t2.elapsed().as_millis();
453
454        // ===== Results =====
455        println!("\n=== Bulk Write Benchmark ({n} symbols) ===\n");
456        println!(
457            "  {:20} {:>8} {:>12} {:>10}",
458            "Method", "Time", "Throughput", "File Size"
459        );
460        println!(
461            "  {:20} {:>8} {:>12} {:>10}",
462            "------", "----", "----------", "---------"
463        );
464        println!(
465            "  {:20} {:>7}ms {:>9.0}/sec {:>9}KB",
466            "COPY FROM CSV (TSV)",
467            csv_ms,
468            n as f64 / csv_ms.max(1) as f64 * 1000.0,
469            csv_size / 1024
470        );
471        println!(
472            "  {:20} {:>7}ms {:>9.0}/sec {:>9}KB",
473            "COPY FROM Parquet",
474            pq_ms,
475            n as f64 / pq_ms.max(1) as f64 * 1000.0,
476            pq_size / 1024
477        );
478        println!(
479            "  {:20} {:>7}ms {:>9.0}/sec {:>10}",
480            "UNWIND chunks",
481            unwind_ms,
482            n as f64 / unwind_ms.max(1) as f64 * 1000.0,
483            "N/A"
484        );
485        println!(
486            "\n  CSV vs Parquet     : {:.2}x",
487            csv_ms as f64 / pq_ms.max(1) as f64
488        );
489        println!(
490            "  Parquet vs UNWIND  : {:.1}x",
491            unwind_ms as f64 / pq_ms.max(1) as f64
492        );
493
494        // ===== Data Integrity =====
495        println!("\n=== Data Integrity Check ===\n");
496        let fields = [
497            "id",
498            "name",
499            "kind",
500            "file",
501            "start_line",
502            "end_line",
503            "signature_hash",
504            "language",
505            "visibility",
506            "parent",
507            "docstring",
508            "complexity",
509        ];
510        let field_list = fields
511            .iter()
512            .map(|f| format!("s.{f}"))
513            .collect::<Vec<_>>()
514            .join(", ");
515
516        let read_all = |table: &str| -> Result<Vec<Vec<String>>> {
517            let r = conn.query(&format!(
518                "MATCH (s:{table}) RETURN {field_list} ORDER BY s.id"
519            ))?;
520            let mut out = Vec::new();
521            for row in r {
522                out.push(row.iter().map(|v| v.to_string()).collect());
523            }
524            Ok(out)
525        };
526
527        let csv_rows = read_all("BenchCSV")?;
528        let pq_rows = read_all("BenchParquet")?;
529        let uw_rows = read_all("BenchUnwind")?;
530
531        let compare = |name: &str, a: &[Vec<String>], b: &[Vec<String>]| {
532            let mut mismatches = 0usize;
533            if a.len() != b.len() {
534                println!("  {name}: ROW COUNT MISMATCH ({} vs {})", a.len(), b.len());
535                return;
536            }
537            for (i, (ar, br)) in a.iter().zip(b.iter()).enumerate() {
538                for (fi, field) in fields.iter().enumerate() {
539                    if ar.get(fi) != br.get(fi) {
540                        if mismatches < 5 {
541                            println!("  {name} MISMATCH row={i} field={field}:");
542                            println!("    left:  {:?}", ar.get(fi));
543                            println!("    right: {:?}", br.get(fi));
544                        }
545                        mismatches += 1;
546                    }
547                }
548            }
549            if mismatches == 0 {
550                println!(
551                    "  {name}: PASS -- all {n} symbols x {} fields match",
552                    fields.len()
553                );
554            } else {
555                println!("  {name}: FAIL -- {mismatches} mismatches");
556            }
557        };
558
559        compare("CSV vs Parquet", &csv_rows, &pq_rows);
560        compare("CSV vs UNWIND", &csv_rows, &uw_rows);
561        compare("Parquet vs UNWIND", &pq_rows, &uw_rows);
562
563        // Cleanup
564        let _ = conn.query("DROP TABLE BenchCSV");
565        let _ = conn.query("DROP TABLE BenchParquet");
566        let _ = conn.query("DROP TABLE BenchUnwind");
567        let _ = std::fs::remove_file(&csv_path);
568        let _ = std::fs::remove_file(&pq_path);
569
570        Ok(())
571    }
572}