Skip to main content

gitcortex_store/kuzu/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::Path,
4};
5
6use gitcortex_core::{
7    error::{GitCortexError, Result},
8    graph::{Edge, GraphDiff, Node, NodeId},
9    schema::{NodeKind, SCHEMA_VERSION},
10    store::{CallersDeep, GraphStore, SubGraph, SymbolContext},
11};
12use kuzu::{Connection, Database, SystemConfig};
13
14use crate::{branch, schema as db_schema};
15
16mod bulk;
17mod conv;
18mod escape;
19mod queries;
20mod values;
21
22use conv::{edge_kind_from_str, lang_scope_clause, vis_str};
23use escape::{esc, esc_multiline};
24use queries::{collect_ids, rows_to_nodes, NODE_COLS, SYMBOL_RANK};
25use values::str_val;
26
27// Batch sizes for `UNWIND`-based inserts. Nodes carry a (≤16 KB) def_body, so
28// their chunk is kept small to bound query-string size; edges are three ids
29// each, so they batch much larger.
30const NODE_INSERT_CHUNK: usize = 128;
31const EDGE_INSERT_CHUNK: usize = 1000;
32
33/// Render a `Node` as a Cypher struct literal `{id:'…', kind:'…', …}` for use
34/// inside an `UNWIND [...] AS r CREATE` batch. String fields are escaped and
35/// single-quoted; bools/ints are emitted bare.
36fn node_struct_literal(node: &Node) -> String {
37    let id = esc(&node.id.as_str());
38    let kind = esc(&node.kind.to_string());
39    let name = esc(&node.name);
40    let qname = esc(&node.qualified_name);
41    let file = esc(node.file.to_string_lossy().as_ref());
42    let sl = node.span.start_line as i64;
43    let el = node.span.end_line as i64;
44    let loc = node.metadata.loc as i64;
45    let vis = esc(&vis_str(&node.metadata.visibility));
46    let m = &node.metadata;
47    let generic_bounds = esc(&m.generic_bounds.join("|"));
48    let def_sig = esc_multiline(&m.definition.signature);
49    let def_body = esc_multiline(&m.definition.body);
50    let def_doc = esc_multiline(m.definition.doc_comment.as_deref().unwrap_or(""));
51    let def_start_byte = m.definition.start_byte as i64;
52    let def_end_byte = m.definition.end_byte as i64;
53
54    format!(
55        "{{id:'{id}', kind:'{kind}', name:'{name}', qualified_name:'{qname}', file:'{file}', \
56         start_line:{sl}, end_line:{el}, loc:{loc}, visibility:'{vis}', \
57         is_async:{ia}, is_unsafe:{iu}, is_static:{ist}, is_abstract:{iab}, is_final:{ifi}, \
58         is_property:{ip}, is_generator:{ig}, is_const:{ic}, generic_bounds:'{generic_bounds}', \
59         def_signature:'{def_sig}', def_body:'{def_body}', def_doc:'{def_doc}', \
60         def_start_byte:{def_start_byte}, def_end_byte:{def_end_byte}}}",
61        ia = m.is_async,
62        iu = m.is_unsafe,
63        ist = m.is_static,
64        iab = m.is_abstract,
65        ifi = m.is_final,
66        ip = m.is_property,
67        ig = m.is_generator,
68        ic = m.is_const,
69    )
70}
71
72/// True when the branch's node table has zero rows (fresh / never indexed).
73fn node_table_is_empty(conn: &Connection, nt: &str) -> Result<bool> {
74    let mut r = conn
75        .query(&format!("MATCH (n:{nt}) RETURN count(n) AS c LIMIT 1"))
76        .map_err(|e| GitCortexError::Store(format!("count nodes: {e}")))?;
77    match r.by_ref().next() {
78        Some(row) => match &row[0] {
79            kuzu::Value::Int64(n) => Ok(*n == 0),
80            _ => Ok(false),
81        },
82        None => Ok(true),
83    }
84}
85
86/// Bulk-load a full-index diff via CSV `COPY`. Stages CSVs in a unique temp
87/// dir, loads them, then removes the dir. See [`bulk`] for the rationale.
88fn bulk_apply(conn: &Connection, nt: &str, et: &str, diff: &GraphDiff) -> Result<()> {
89    // Unique staging dir per call: pid + nanos + a process-wide atomic counter,
90    // so concurrent `apply_diff`s (e.g. parallel tests in one binary) never
91    // share a directory.
92    use std::sync::atomic::{AtomicU64, Ordering};
93    static SEQ: AtomicU64 = AtomicU64::new(0);
94    let stage = std::env::temp_dir().join(format!(
95        "gcx-bulk-{}-{}-{}",
96        std::process::id(),
97        std::time::SystemTime::now()
98            .duration_since(std::time::UNIX_EPOCH)
99            .map(|d| d.as_nanos())
100            .unwrap_or(0),
101        SEQ.fetch_add(1, Ordering::Relaxed),
102    ));
103    std::fs::create_dir_all(&stage)
104        .map_err(|e| GitCortexError::Store(format!("create staging dir: {e}")))?;
105
106    let result = bulk::bulk_load(conn, nt, et, &stage, &diff.added_nodes, &diff.added_edges);
107
108    // Best-effort cleanup regardless of load outcome.
109    let _ = std::fs::remove_dir_all(&stage);
110
111    result.map(|_| ())
112}
113
114// ── KuzuGraphStore ────────────────────────────────────────────────────────────
115
116/// Local KuzuDB-backed implementation of [`GraphStore`].
117///
118/// One database file per repo (`graph.kuzu`), with per-branch node/edge tables
119/// inside it. A fresh `Connection` is created for each operation so we avoid
120/// the self-referential lifetime that `Mutex<Connection<'db>>` would require.
121pub struct KuzuGraphStore {
122    db: Database,
123    repo_id: String,
124}
125
126impl KuzuGraphStore {
127    /// Open (or create) the graph database for the repo at `repo_root`.
128    ///
129    /// If the persisted schema version doesn't match [`SCHEMA_VERSION`], the
130    /// entire repo data directory is wiped so a fresh full index runs on next
131    /// hook invocation.
132    pub fn open(repo_root: &Path) -> Result<Self> {
133        let repo_id = branch::repo_id(repo_root);
134
135        if branch::read_schema_version(&repo_id) != SCHEMA_VERSION {
136            eprintln!(
137                "gitcortex: schema version mismatch (expected {}); wiping graph store for re-index",
138                SCHEMA_VERSION
139            );
140            branch::wipe_repo_data(&repo_id);
141            branch::write_schema_version(&repo_id, SCHEMA_VERSION)?;
142        }
143
144        let db_path = branch::db_path(&repo_id);
145        if let Some(parent) = db_path.parent() {
146            std::fs::create_dir_all(parent)?;
147        }
148
149        let db = Database::new(&db_path, SystemConfig::default())
150            .map_err(|e| GitCortexError::Store(format!("open db: {e}")))?;
151
152        Ok(Self { db, repo_id })
153    }
154
155    // ── Private helpers ───────────────────────────────────────────────────────
156
157    fn conn(&self) -> Result<Connection<'_>> {
158        Connection::new(&self.db)
159            .map_err(|e| GitCortexError::Store(format!("open connection: {e}")))
160    }
161
162    fn ensure_branch(&self, branch: &str) -> Result<()> {
163        let mut conn = self.conn()?;
164        db_schema::ensure_branch(&mut conn, branch)
165    }
166}
167
168// ── GraphStore impl ───────────────────────────────────────────────────────────
169
170impl GraphStore for KuzuGraphStore {
171    // ── Write path ────────────────────────────────────────────────────────────
172
173    fn apply_diff(&mut self, branch: &str, diff: &GraphDiff) -> Result<()> {
174        if diff.is_empty() {
175            return Ok(());
176        }
177
178        self.ensure_branch(branch)?;
179        let nt = db_schema::node_table(branch);
180        let et = db_schema::edge_table(branch);
181        let conn = self.conn()?;
182
183        // ── Fast path: bulk COPY load for a fresh full index ───────────────────
184        // When the branch's node table is empty this is a first full index.
185        // Stage the nodes/edges as CSV and `COPY` them in — ~100× faster than
186        // per-row MATCH/CREATE on large repos.
187        //
188        // The diff's `removed_*` fields are ignored on this path: the indexer
189        // emits a `removed_files` entry for every parsed file + its ancestor
190        // folders (so an incremental re-parse first clears the old nodes), but
191        // against an empty table those deletes are vacuous. Deferred cross-file
192        // resolution is likewise skipped — on a full index every in-repo name
193        // is already in `added_edges`; the only `deferred_*` left are external
194        // (stdlib) names the store couldn't resolve anyway.
195        let empty = node_table_is_empty(&conn, &nt)?;
196        if std::env::var_os("GCX_TIMING").is_some() {
197            eprintln!(
198                "[gcx-timing] apply_diff path: table_empty={empty} nodes={} edges={}",
199                diff.added_nodes.len(),
200                diff.added_edges.len()
201            );
202        }
203        if empty {
204            return bulk_apply(&conn, &nt, &et, diff);
205        }
206
207        // Transaction 1: commit all deletes first.
208        // KuzuDB has a quirk where DETACH DELETE + CREATE in the same transaction
209        // can produce NULL for the last STRING column in newly created nodes.
210        // Splitting into separate transactions avoids this.
211        conn.query("BEGIN TRANSACTION")
212            .map_err(|e| GitCortexError::Store(format!("begin delete transaction: {e}")))?;
213
214        // 1. Remove nodes for deleted/replaced files.
215        //    Skip directory paths (no extension) — folder nodes are reused across
216        //    incremental updates to preserve their Contains edges to sibling files.
217        for file in &diff.removed_files {
218            if file.extension().is_none() {
219                continue;
220            }
221            let file_str = esc(file.to_string_lossy().as_ref());
222            conn.query(&format!(
223                "MATCH (n:{nt}) WHERE n.file = '{file_str}' DETACH DELETE n"
224            ))
225            .map_err(|e| GitCortexError::Store(format!("delete file nodes: {e}")))?;
226        }
227
228        // 2. Remove explicit node IDs.
229        for id in &diff.removed_node_ids {
230            let id_str = esc(&id.as_str());
231            conn.query(&format!(
232                "MATCH (n:{nt}) WHERE n.id = '{id_str}' DETACH DELETE n"
233            ))
234            .map_err(|e| GitCortexError::Store(format!("delete node: {e}")))?;
235        }
236
237        // 3. Remove explicit edges.
238        for (src, dst, kind) in &diff.removed_edges {
239            let s = esc(&src.as_str());
240            let d = esc(&dst.as_str());
241            let k = esc(&kind.to_string());
242            conn.query(&format!(
243                "MATCH (s:{nt})-[e:{et}]->(d:{nt}) \
244                 WHERE s.id = '{s}' AND d.id = '{d}' AND e.kind = '{k}' \
245                 DELETE e"
246            ))
247            .map_err(|e| GitCortexError::Store(format!("delete edge: {e}")))?;
248        }
249
250        conn.query("COMMIT")
251            .map_err(|e| GitCortexError::Store(format!("commit deletes: {e}")))?;
252
253        // Build a remap table: for each Folder node in the diff, if a folder at
254        // that path already exists in the DB, reuse its ID so that existing
255        // Contains edges to sibling files are preserved.
256        // Use the same connection (no open transaction between tx1 COMMIT and tx2 BEGIN).
257        let mut id_remap: HashMap<String, String> = HashMap::new();
258        for node in diff
259            .added_nodes
260            .iter()
261            .filter(|n| n.kind == NodeKind::Folder)
262        {
263            let path_esc = esc(node.file.to_string_lossy().as_ref());
264            let mut check = conn
265                .query(&format!(
266                    "MATCH (n:{nt}) WHERE n.file = '{path_esc}' AND n.kind = 'folder' \
267                     RETURN n.id LIMIT 1"
268                ))
269                .map_err(|e| GitCortexError::Store(e.to_string()))?;
270            if let Some(row) = check.by_ref().next() {
271                if let Ok(existing_id) = str_val(&row[0]) {
272                    tracing::debug!("folder remap: {} → {}", node.file.display(), existing_id);
273                    id_remap.insert(node.id.as_str().to_owned(), existing_id);
274                }
275            }
276        }
277
278        // Transaction 2: insert new nodes. Deduplicate by ID first so a rename
279        // delta (or any other case producing the same NodeId twice) never hits a
280        // PK violation. Folder nodes remapped to existing DB nodes are skipped.
281        conn.query("BEGIN TRANSACTION")
282            .map_err(|e| GitCortexError::Store(format!("begin node insert transaction: {e}")))?;
283
284        // Batch node inserts via `UNWIND [<struct>, …] CREATE`. One query per
285        // chunk instead of one per node — a ~100× cut in round-trips on a full
286        // index of a large repo. Chunk size is kept modest because each row
287        // carries the (truncated) def_body, so a chunk can still be a few MB.
288        let mut seen_node_ids: HashSet<String> = HashSet::new();
289        let rows: Vec<String> = diff
290            .added_nodes
291            .iter()
292            .filter(|n| seen_node_ids.insert(n.id.as_str().to_owned()))
293            // Folder node remapped to an existing DB node — skip INSERT.
294            .filter(|n| !id_remap.contains_key(&n.id.as_str()))
295            .map(node_struct_literal)
296            .collect();
297
298        for chunk in rows.chunks(NODE_INSERT_CHUNK) {
299            let list = chunk.join(", ");
300            conn.query(&format!(
301                "UNWIND [{list}] AS r \
302                 CREATE (:{nt} {{\
303                    id: r.id, kind: r.kind, name: r.name, \
304                    qualified_name: r.qualified_name, file: r.file, \
305                    start_line: r.start_line, end_line: r.end_line, loc: r.loc, \
306                    visibility: r.visibility, is_async: r.is_async, is_unsafe: r.is_unsafe, \
307                    is_static: r.is_static, is_abstract: r.is_abstract, is_final: r.is_final, \
308                    is_property: r.is_property, is_generator: r.is_generator, is_const: r.is_const, \
309                    generic_bounds: r.generic_bounds, \
310                    def_signature: r.def_signature, def_body: r.def_body, def_doc: r.def_doc, \
311                    def_start_byte: r.def_start_byte, def_end_byte: r.def_end_byte\
312                 }})"
313            ))
314            .map_err(|e| GitCortexError::Store(format!("batch insert nodes: {e}")))?;
315        }
316
317        // Commit node inserts so the edge MATCH queries in step 3 see them.
318        conn.query("COMMIT")
319            .map_err(|e| GitCortexError::Store(format!("commit nodes: {e}")))?;
320
321        // Transaction 3: insert edges and resolve deferred references.
322        conn.query("BEGIN TRANSACTION")
323            .map_err(|e| GitCortexError::Store(format!("begin edge transaction: {e}")))?;
324
325        // 4. Insert new edges. Deduplicate by (src,dst,kind) to avoid creating
326        //    parallel edges. Remap folder IDs to existing DB nodes where applicable.
327        //    MATCH yields nothing for missing endpoints → skip silently.
328        let mut seen_edges: HashSet<(String, String, String)> = HashSet::new();
329        let edge_rows: Vec<String> = diff
330            .added_edges
331            .iter()
332            .filter(|e| {
333                seen_edges.insert((
334                    e.src.as_str().to_owned(),
335                    e.dst.as_str().to_owned(),
336                    e.kind.to_string(),
337                ))
338            })
339            .map(|edge| {
340                let src_raw = edge.src.as_str();
341                let dst_raw = edge.dst.as_str();
342                let s = esc(id_remap
343                    .get(&src_raw)
344                    .map(String::as_str)
345                    .unwrap_or(&src_raw));
346                let d = esc(id_remap
347                    .get(&dst_raw)
348                    .map(String::as_str)
349                    .unwrap_or(&dst_raw));
350                let k = esc(&edge.kind.to_string());
351                format!("{{s:'{s}', d:'{d}', k:'{k}'}}")
352            })
353            .collect();
354
355        // Batch edge inserts via `UNWIND … MATCH … CREATE`. Edge rows are tiny
356        // (three ids), so a larger chunk than nodes is fine. Endpoints missing
357        // from the store yield no MATCH row and are skipped silently — same
358        // semantics as the per-edge version.
359        for chunk in edge_rows.chunks(EDGE_INSERT_CHUNK) {
360            let list = chunk.join(", ");
361            conn.query(&format!(
362                "UNWIND [{list}] AS r \
363                 MATCH (s:{nt} {{id: r.s}}), (d:{nt} {{id: r.d}}) \
364                 CREATE (s)-[:{et} {{kind: r.k}}]->(d)"
365            ))
366            .map_err(|e| GitCortexError::Store(format!("batch insert edges: {e}")))?;
367        }
368
369        // 6. Resolve cross-file deferred edges against the full store.
370        //    The diff-local pass couldn't find these callees/types because they
371        //    live in unchanged files. We match by name here — best-effort without
372        //    full type inference, filtered to the correct node kinds to reduce noise.
373        //
374        //    Scoping: the resolved candidate must live in a file whose extension
375        //    belongs to the same language family as the caller. Without this a
376        //    Rust `welcome()` would resolve to a Python `polite_greet()` simply
377        //    because they share a name. The caller-file map is built once from
378        //    `added_nodes` (all deferred caller ids are diff-local nodes).
379        let caller_file: HashMap<String, String> = diff
380            .added_nodes
381            .iter()
382            .map(|n| {
383                (
384                    n.id.as_str().to_owned(),
385                    n.file.to_string_lossy().into_owned(),
386                )
387            })
388            .collect();
389
390        for (caller_id, callee_name) in &diff.deferred_calls {
391            let caller_id_str = caller_id.as_str();
392            let caller = esc(&caller_id_str);
393            let callee = esc(callee_name);
394            let scope = caller_file
395                .get(&caller_id_str)
396                .map(|f| lang_scope_clause(f, "callee"))
397                .unwrap_or_default();
398            conn.query(&format!(
399                "MATCH (caller:{nt} {{id: '{caller}'}}), (callee:{nt}) \
400                 WHERE callee.name = '{callee}' \
401                 AND (callee.kind = 'function' OR callee.kind = 'method'){scope} \
402                 CREATE (caller)-[:{et} {{kind: 'calls'}}]->(callee)"
403            ))
404            .map_err(|e| GitCortexError::Store(format!("deferred call '{callee_name}': {e}")))?;
405        }
406
407        for (fn_id, type_name) in &diff.deferred_uses {
408            let fn_id_str = fn_id.as_str();
409            let fn_esc = esc(&fn_id_str);
410            let ty = esc(type_name);
411            let scope = caller_file
412                .get(&fn_id_str)
413                .map(|f| lang_scope_clause(f, "ty"))
414                .unwrap_or_default();
415            conn.query(&format!(
416                "MATCH (fn_node:{nt} {{id: '{fn_esc}'}}), (ty:{nt}) \
417                 WHERE ty.name = '{ty}' \
418                 AND (ty.kind = 'struct' OR ty.kind = 'enum' \
419                      OR ty.kind = 'trait' OR ty.kind = 'interface' \
420                      OR ty.kind = 'type_alias'){scope} \
421                 CREATE (fn_node)-[:{et} {{kind: 'uses'}}]->(ty)"
422            ))
423            .map_err(|e| GitCortexError::Store(format!("deferred use '{type_name}': {e}")))?;
424        }
425
426        for (struct_id, trait_name) in &diff.deferred_implements {
427            let sid = struct_id.as_str();
428            let s = esc(&sid);
429            let t = esc(trait_name);
430            let scope = caller_file
431                .get(&sid)
432                .map(|f| lang_scope_clause(f, "tr"))
433                .unwrap_or_default();
434            conn.query(&format!(
435                "MATCH (st:{nt} {{id: '{s}'}}), (tr:{nt}) \
436                 WHERE tr.name = '{t}' AND (tr.kind = 'trait' OR tr.kind = 'interface'){scope} \
437                 CREATE (st)-[:{et} {{kind: 'implements'}}]->(tr)"
438            ))
439            .map_err(|e| GitCortexError::Store(format!("deferred impl '{trait_name}': {e}")))?;
440        }
441
442        for (subtype_id, supertype_name) in &diff.deferred_inherits {
443            let sid = subtype_id.as_str();
444            let s = esc(&sid);
445            let t = esc(supertype_name);
446            let scope = caller_file
447                .get(&sid)
448                .map(|f| lang_scope_clause(f, "sup"))
449                .unwrap_or_default();
450            conn.query(&format!(
451                "MATCH (sub:{nt} {{id: '{s}'}}), (sup:{nt}) \
452                 WHERE sup.name = '{t}' \
453                 AND (sup.kind = 'struct' OR sup.kind = 'interface' OR sup.kind = 'trait'){scope} \
454                 CREATE (sub)-[:{et} {{kind: 'inherits'}}]->(sup)"
455            ))
456            .map_err(|e| {
457                GitCortexError::Store(format!("deferred inherits '{supertype_name}': {e}"))
458            })?;
459        }
460
461        for (method_id, exception_name) in &diff.deferred_throws {
462            let mid = method_id.as_str();
463            let m = esc(&mid);
464            let e_name = esc(exception_name);
465            let scope = caller_file
466                .get(&mid)
467                .map(|f| lang_scope_clause(f, "ex"))
468                .unwrap_or_default();
469            conn.query(&format!(
470                "MATCH (m:{nt} {{id: '{m}'}}), (ex:{nt}) \
471                 WHERE ex.name = '{e_name}'{scope} \
472                 CREATE (m)-[:{et} {{kind: 'throws'}}]->(ex)"
473            ))
474            .map_err(|e| {
475                GitCortexError::Store(format!("deferred throws '{exception_name}': {e}"))
476            })?;
477        }
478
479        for (target_id, annotation_name) in &diff.deferred_annotated {
480            let tid = target_id.as_str();
481            let t = esc(&tid);
482            let a = esc(annotation_name);
483            let scope = caller_file
484                .get(&tid)
485                .map(|f| lang_scope_clause(f, "ann"))
486                .unwrap_or_default();
487            conn.query(&format!(
488                "MATCH (target:{nt} {{id: '{t}'}}), (ann:{nt}) \
489                 WHERE ann.name = '{a}' \
490                 AND (ann.kind = 'annotation' OR ann.kind = 'macro' OR ann.kind = 'function'){scope} \
491                 CREATE (target)-[:{et} {{kind: 'annotated'}}]->(ann)"
492            ))
493            .map_err(|e| {
494                GitCortexError::Store(format!("deferred annotated '{annotation_name}': {e}"))
495            })?;
496        }
497
498        conn.query("COMMIT")
499            .map_err(|e| GitCortexError::Store(format!("commit edges: {e}")))?;
500
501        Ok(())
502    }
503
504    // ── Read path ─────────────────────────────────────────────────────────────
505
506    fn lookup_symbol(&self, branch: &str, name: &str, fuzzy: bool) -> Result<Vec<Node>> {
507        self.ensure_branch(branch)?;
508        let nt = db_schema::node_table(branch);
509        let name_esc = esc(name);
510        let conn = self.conn()?;
511
512        let condition = if fuzzy {
513            format!("contains(n.name, '{name_esc}')")
514        } else {
515            format!("n.name = '{name_esc}'")
516        };
517
518        let mut result = conn
519            .query(&format!(
520                "MATCH (n:{nt}) WHERE {condition} RETURN {NODE_COLS} ORDER BY {SYMBOL_RANK}"
521            ))
522            .map_err(|e| GitCortexError::Store(e.to_string()))?;
523
524        rows_to_nodes(&mut result)
525    }
526
527    fn find_callers(&self, branch: &str, function_name: &str) -> Result<Vec<Node>> {
528        self.ensure_branch(branch)?;
529        let nt = db_schema::node_table(branch);
530        let et = db_schema::edge_table(branch);
531        let name_esc = esc(function_name);
532        let conn = self.conn()?;
533
534        let mut result = conn
535            .query(&format!(
536                "MATCH (n:{nt})-[:{et} {{kind: 'calls'}}]->(callee:{nt}) \
537                 WHERE callee.name = '{name_esc}' \
538                 RETURN DISTINCT {NODE_COLS}"
539            ))
540            .map_err(|e| GitCortexError::Store(e.to_string()))?;
541
542        rows_to_nodes(&mut result)
543    }
544
545    fn find_callers_deep(
546        &self,
547        branch: &str,
548        function_name: &str,
549        depth: u8,
550    ) -> Result<CallersDeep> {
551        let depth = depth.min(5);
552        let mut hops: Vec<Vec<Node>> = Vec::new();
553        // Track seen node IDs to avoid cycles.
554        let mut seen: HashSet<String> = HashSet::new();
555        // The frontier holds the *names* of nodes whose callers we search next.
556        let mut frontier: Vec<String> = vec![function_name.to_owned()];
557        seen.insert(function_name.to_owned());
558
559        for _ in 0..depth {
560            if frontier.is_empty() {
561                break;
562            }
563            let mut hop_nodes: Vec<Node> = Vec::new();
564            let mut next_frontier: Vec<String> = Vec::new();
565            for target in &frontier {
566                for caller in self.find_callers(branch, target)? {
567                    let id = caller.id.as_str().to_owned();
568                    if seen.insert(id) {
569                        next_frontier.push(caller.name.clone());
570                        hop_nodes.push(caller);
571                    }
572                }
573            }
574            hops.push(hop_nodes);
575            frontier = next_frontier;
576        }
577
578        let total_affected: usize = hops.iter().map(|h| h.len()).sum();
579        let risk_level = match total_affected {
580            0..=2 => "LOW",
581            3..=10 => "MEDIUM",
582            11..=30 => "HIGH",
583            _ => "CRITICAL",
584        };
585
586        Ok(CallersDeep { hops, risk_level })
587    }
588
589    fn symbol_context(&self, branch: &str, name: &str) -> Result<SymbolContext> {
590        self.ensure_branch(branch)?;
591        let nt = db_schema::node_table(branch);
592        let et = db_schema::edge_table(branch);
593        let name_esc = esc(name);
594        let conn = self.conn()?;
595
596        // Definition — best match by kind priority (type decl > fn/method >
597        // … > module/file), so `wiki Echo` resolves to `type Echo` not a
598        // same-named method.
599        let mut def_result = conn
600            .query(&format!(
601                "MATCH (n:{nt}) WHERE n.name = '{name_esc}' \
602                 RETURN {NODE_COLS} ORDER BY {SYMBOL_RANK} LIMIT 1"
603            ))
604            .map_err(|e| GitCortexError::Store(e.to_string()))?;
605        let mut defs = rows_to_nodes(&mut def_result)?;
606        if defs.is_empty() {
607            return Err(GitCortexError::Store(format!(
608                "symbol '{name}' not found on branch '{branch}'"
609            )));
610        }
611        let definition = defs.remove(0);
612
613        // Scope callers/callees/used-by to THIS specific definition (by id),
614        // not by name. Otherwise a Java `welcome` would pull in callees from
615        // a Python `welcome` that happens to share the name. `find_callers`
616        // as a standalone tool remains name-based — callers without a specific
617        // definition node have no other handle.
618        let def_id = esc(&definition.id.as_str());
619
620        let mut caller_result = conn
621            .query(&format!(
622                "MATCH (n:{nt})-[:{et} {{kind: 'calls'}}]->(callee:{nt}) \
623                 WHERE callee.id = '{def_id}' \
624                 RETURN DISTINCT {NODE_COLS}"
625            ))
626            .map_err(|e| GitCortexError::Store(e.to_string()))?;
627        let callers = rows_to_nodes(&mut caller_result)?;
628
629        let mut callee_result = conn
630            .query(&format!(
631                "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
632                 WHERE caller.id = '{def_id}' \
633                 RETURN {NODE_COLS}"
634            ))
635            .map_err(|e| GitCortexError::Store(e.to_string()))?;
636        let callees = rows_to_nodes(&mut callee_result)?;
637
638        let mut used_result = conn
639            .query(&format!(
640                "MATCH (n:{nt})-[:{et} {{kind: 'uses'}}]->(ty:{nt}) \
641                 WHERE ty.id = '{def_id}' \
642                 RETURN {NODE_COLS}"
643            ))
644            .map_err(|e| GitCortexError::Store(e.to_string()))?;
645        let used_by = rows_to_nodes(&mut used_result)?;
646
647        Ok(SymbolContext {
648            definition,
649            callers,
650            callees,
651            used_by,
652        })
653    }
654
655    fn list_definitions(&self, branch: &str, file: &Path) -> Result<Vec<Node>> {
656        self.ensure_branch(branch)?;
657        let nt = db_schema::node_table(branch);
658        let file_esc = esc(file.to_string_lossy().as_ref());
659        let conn = self.conn()?;
660
661        let mut result = conn
662            .query(&format!(
663                "MATCH (n:{nt}) WHERE n.file = '{file_esc}' \
664                 RETURN {NODE_COLS} ORDER BY n.start_line"
665            ))
666            .map_err(|e| GitCortexError::Store(e.to_string()))?;
667
668        rows_to_nodes(&mut result)
669    }
670
671    fn branch_diff(&self, from: &str, to: &str) -> Result<GraphDiff> {
672        self.ensure_branch(from)?;
673        self.ensure_branch(to)?;
674
675        let from_nt = db_schema::node_table(from);
676        let to_nt = db_schema::node_table(to);
677        let mut conn = self.conn()?;
678
679        // Collect node IDs from each branch.
680        let from_ids = collect_ids(&mut conn, &from_nt)?;
681        let to_ids = collect_ids(&mut conn, &to_nt)?;
682
683        // Nodes in `to` but not in `from` → added.
684        let added_ids: Vec<&String> = to_ids.iter().filter(|id| !from_ids.contains(*id)).collect();
685
686        // Nodes in `from` but not in `to` → removed.
687        let removed_ids: Vec<&String> =
688            from_ids.iter().filter(|id| !to_ids.contains(*id)).collect();
689
690        let mut diff = GraphDiff::default();
691
692        for id in added_ids {
693            let id_esc = esc(id);
694            let mut r = conn
695                .query(&format!(
696                    "MATCH (n:{to_nt}) WHERE n.id = '{id_esc}' RETURN {NODE_COLS}"
697                ))
698                .map_err(|e| GitCortexError::Store(e.to_string()))?;
699            diff.added_nodes.extend(rows_to_nodes(&mut r)?);
700        }
701
702        for id in removed_ids {
703            if let Ok(node_id) = NodeId::try_from(id.as_str()) {
704                diff.removed_node_ids.push(node_id);
705            }
706        }
707
708        Ok(diff)
709    }
710
711    fn list_all_nodes(&self, branch: &str) -> Result<Vec<Node>> {
712        self.ensure_branch(branch)?;
713        let nt = db_schema::node_table(branch);
714        let conn = self.conn()?;
715        let mut result = conn
716            .query(&format!("MATCH (n:{nt}) RETURN {NODE_COLS}"))
717            .map_err(|e| GitCortexError::Store(e.to_string()))?;
718        rows_to_nodes(&mut result)
719    }
720
721    fn list_all_edges(&self, branch: &str) -> Result<Vec<Edge>> {
722        self.ensure_branch(branch)?;
723        let nt = db_schema::node_table(branch);
724        let et = db_schema::edge_table(branch);
725        let conn = self.conn()?;
726        let result = conn
727            .query(&format!(
728                "MATCH (s:{nt})-[e:{et}]->(d:{nt}) RETURN s.id, d.id, e.kind"
729            ))
730            .map_err(|e| GitCortexError::Store(e.to_string()))?;
731
732        let mut out = Vec::new();
733        for row in result {
734            let src_str = str_val(&row[0])?;
735            let dst_str = str_val(&row[1])?;
736            let kind_str = str_val(&row[2])?;
737            out.push(Edge {
738                src: NodeId::try_from(src_str.as_str())
739                    .map_err(|e| GitCortexError::Store(format!("bad src id: {e}")))?,
740                dst: NodeId::try_from(dst_str.as_str())
741                    .map_err(|e| GitCortexError::Store(format!("bad dst id: {e}")))?,
742                kind: edge_kind_from_str(&kind_str),
743            });
744        }
745        Ok(out)
746    }
747
748    fn find_callees(&self, branch: &str, function_name: &str, depth: u8) -> Result<CallersDeep> {
749        let depth = depth.min(5);
750        let mut hops: Vec<Vec<Node>> = Vec::new();
751        let mut seen: HashSet<String> = HashSet::new();
752        let mut frontier: Vec<String> = vec![function_name.to_owned()];
753        seen.insert(function_name.to_owned());
754
755        for _ in 0..depth {
756            if frontier.is_empty() {
757                break;
758            }
759            let mut hop_nodes: Vec<Node> = Vec::new();
760            let mut next_frontier: Vec<String> = Vec::new();
761            for caller_name in &frontier {
762                let nt = db_schema::node_table(branch);
763                let et = db_schema::edge_table(branch);
764                let name_esc = esc(caller_name);
765                let conn = self.conn()?;
766                let mut result = conn
767                    .query(&format!(
768                        "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
769                         WHERE caller.name = '{name_esc}' \
770                         RETURN {NODE_COLS}"
771                    ))
772                    .map_err(|e| GitCortexError::Store(e.to_string()))?;
773                for node in rows_to_nodes(&mut result)? {
774                    let id = node.id.as_str().to_owned();
775                    if seen.insert(id) {
776                        next_frontier.push(node.name.clone());
777                        hop_nodes.push(node);
778                    }
779                }
780            }
781            hops.push(hop_nodes);
782            frontier = next_frontier;
783        }
784
785        let total: usize = hops.iter().map(|h| h.len()).sum();
786        let risk_level = match total {
787            0..=2 => "LOW",
788            3..=10 => "MEDIUM",
789            11..=30 => "HIGH",
790            _ => "CRITICAL",
791        };
792        Ok(CallersDeep { hops, risk_level })
793    }
794
795    fn find_implementors(&self, branch: &str, trait_or_interface_name: &str) -> Result<Vec<Node>> {
796        self.ensure_branch(branch)?;
797        let nt = db_schema::node_table(branch);
798        let et = db_schema::edge_table(branch);
799        let name_esc = esc(trait_or_interface_name);
800        let conn = self.conn()?;
801        let mut result = conn
802            .query(&format!(
803                "MATCH (n:{nt})-[e:{et}]->(trait_node:{nt}) \
804                 WHERE trait_node.name = '{name_esc}' \
805                 AND (e.kind = 'implements' OR e.kind = 'inherits') \
806                 RETURN DISTINCT {NODE_COLS} ORDER BY {SYMBOL_RANK}"
807            ))
808            .map_err(|e| GitCortexError::Store(e.to_string()))?;
809        rows_to_nodes(&mut result)
810    }
811
812    fn trace_path(&self, branch: &str, from: &str, to: &str) -> Result<Vec<Node>> {
813        self.ensure_branch(branch)?;
814        let nt = db_schema::node_table(branch);
815        let et = db_schema::edge_table(branch);
816
817        // BFS from `from` to `to` following Calls edges.
818        let from_esc = esc(from);
819        let conn = self.conn()?;
820        let mut start_result = conn
821            .query(&format!(
822                "MATCH (n:{nt}) WHERE n.name = '{from_esc}' RETURN {NODE_COLS} LIMIT 1"
823            ))
824            .map_err(|e| GitCortexError::Store(e.to_string()))?;
825        let start_nodes = rows_to_nodes(&mut start_result)?;
826        if start_nodes.is_empty() {
827            return Ok(Vec::new());
828        }
829
830        // BFS: queue of (current_name, path_so_far)
831        let mut queue: std::collections::VecDeque<(String, Vec<String>)> =
832            std::collections::VecDeque::new();
833        queue.push_back((from.to_owned(), vec![from.to_owned()]));
834        let mut visited: HashSet<String> = HashSet::new();
835        visited.insert(from.to_owned());
836
837        const MAX_HOPS: usize = 6;
838        while let Some((current, path)) = queue.pop_front() {
839            if path.len() > MAX_HOPS {
840                continue;
841            }
842            let cur_esc = esc(&current);
843            let conn2 = self.conn()?;
844            let mut callee_result = conn2
845                .query(&format!(
846                    "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
847                     WHERE caller.name = '{cur_esc}' \
848                     RETURN {NODE_COLS}"
849                ))
850                .map_err(|e| GitCortexError::Store(e.to_string()))?;
851            for node in rows_to_nodes(&mut callee_result)? {
852                let node_name = node.name.clone();
853                if node_name == to {
854                    // Found — resolve full path names to nodes
855                    let mut result_nodes = Vec::new();
856                    for name in &path {
857                        let conn3 = self.conn()?;
858                        let n_esc = esc(name);
859                        let mut r = conn3
860                            .query(&format!(
861                                "MATCH (n:{nt}) WHERE n.name = '{n_esc}' RETURN {NODE_COLS} LIMIT 1"
862                            ))
863                            .map_err(|e| GitCortexError::Store(e.to_string()))?;
864                        result_nodes.extend(rows_to_nodes(&mut r)?);
865                    }
866                    result_nodes.push(node);
867                    return Ok(result_nodes);
868                }
869                if visited.insert(node_name.clone()) {
870                    let mut new_path = path.clone();
871                    new_path.push(node_name.clone());
872                    queue.push_back((node_name, new_path));
873                }
874            }
875        }
876        Ok(Vec::new())
877    }
878
879    fn list_symbols_in_range(
880        &self,
881        branch: &str,
882        file: &Path,
883        start_line: u32,
884        end_line: u32,
885    ) -> Result<Vec<Node>> {
886        self.ensure_branch(branch)?;
887        let nt = db_schema::node_table(branch);
888        let file_esc = esc(file.to_string_lossy().as_ref());
889        let conn = self.conn()?;
890
891        let mut result = conn
892            .query(&format!(
893                "MATCH (n:{nt}) \
894                 WHERE n.file = '{file_esc}' \
895                 AND n.start_line <= {end_line} \
896                 AND n.end_line >= {start_line} \
897                 RETURN {NODE_COLS} ORDER BY n.start_line"
898            ))
899            .map_err(|e| GitCortexError::Store(e.to_string()))?;
900
901        rows_to_nodes(&mut result)
902    }
903
904    fn find_unused_symbols(&self, branch: &str, kind: Option<NodeKind>) -> Result<Vec<Node>> {
905        self.ensure_branch(branch)?;
906        let nt = db_schema::node_table(branch);
907        let et = db_schema::edge_table(branch);
908        let conn = self.conn()?;
909
910        let kind_filter = match &kind {
911            Some(k) => format!("AND n.kind = '{k}'"),
912            None => String::new(),
913        };
914
915        let mut result = conn
916            .query(&format!(
917                "MATCH (n:{nt}) \
918                 WHERE NOT EXISTS {{ MATCH (:{nt})-[:{et} {{kind: 'calls'}}]->(n) }} \
919                 AND NOT EXISTS {{ MATCH (:{nt})-[:{et} {{kind: 'uses'}}]->(n) }} \
920                 AND n.kind <> 'file' AND n.kind <> 'folder' AND n.kind <> 'module' \
921                 {kind_filter} \
922                 RETURN {NODE_COLS} ORDER BY n.file, n.start_line"
923            ))
924            .map_err(|e| GitCortexError::Store(e.to_string()))?;
925
926        rows_to_nodes(&mut result)
927    }
928
929    fn get_subgraph(
930        &self,
931        branch: &str,
932        seed_name: &str,
933        depth: u8,
934        direction: &str,
935    ) -> Result<SubGraph> {
936        self.ensure_branch(branch)?;
937        let depth = depth.min(5);
938        let nt = db_schema::node_table(branch);
939        let et = db_schema::edge_table(branch);
940
941        let seed_esc = esc(seed_name);
942        let conn = self.conn()?;
943        let mut seed_result = conn
944            .query(&format!(
945                "MATCH (n:{nt}) WHERE n.name = '{seed_esc}' RETURN {NODE_COLS} LIMIT 1"
946            ))
947            .map_err(|e| GitCortexError::Store(e.to_string()))?;
948        let seed_nodes = rows_to_nodes(&mut seed_result)?;
949        if seed_nodes.is_empty() {
950            return Ok(SubGraph {
951                nodes: Vec::new(),
952                edges: Vec::new(),
953            });
954        }
955
956        let mut all_node_ids: HashSet<String> = HashSet::new();
957        let mut all_nodes: Vec<Node> = Vec::new();
958        let mut frontier_names: Vec<String> = vec![seed_name.to_owned()];
959
960        for node in seed_nodes {
961            all_node_ids.insert(node.id.as_str().to_owned());
962            all_nodes.push(node);
963        }
964
965        for _ in 0..depth {
966            let mut next_frontier: Vec<String> = Vec::new();
967            for name in &frontier_names {
968                let name_esc = esc(name);
969                // Outbound (callees): what this node calls
970                if direction == "out" || direction == "both" {
971                    let conn2 = self.conn()?;
972                    let mut r = conn2
973                        .query(&format!(
974                            "MATCH (caller:{nt})-[:{et}]->(n:{nt}) \
975                             WHERE caller.name = '{name_esc}' \
976                             RETURN {NODE_COLS}"
977                        ))
978                        .map_err(|e| GitCortexError::Store(e.to_string()))?;
979                    for node in rows_to_nodes(&mut r)? {
980                        let id = node.id.as_str().to_owned();
981                        if all_node_ids.insert(id) {
982                            next_frontier.push(node.name.clone());
983                            all_nodes.push(node);
984                        }
985                    }
986                }
987                // Inbound (callers): what calls this node
988                if direction == "in" || direction == "both" {
989                    let conn3 = self.conn()?;
990                    let mut r = conn3
991                        .query(&format!(
992                            "MATCH (n:{nt})-[:{et}]->(target:{nt}) \
993                             WHERE target.name = '{name_esc}' \
994                             RETURN {NODE_COLS}"
995                        ))
996                        .map_err(|e| GitCortexError::Store(e.to_string()))?;
997                    for node in rows_to_nodes(&mut r)? {
998                        let id = node.id.as_str().to_owned();
999                        if all_node_ids.insert(id) {
1000                            next_frontier.push(node.name.clone());
1001                            all_nodes.push(node);
1002                        }
1003                    }
1004                }
1005            }
1006            if next_frontier.is_empty() {
1007                break;
1008            }
1009            frontier_names = next_frontier;
1010        }
1011
1012        // Collect edges between the nodes in the subgraph
1013        let ids_list: Vec<String> = all_node_ids
1014            .iter()
1015            .map(|id| format!("'{}'", esc(id)))
1016            .collect();
1017        let ids_str = ids_list.join(", ");
1018        let all_edges = if ids_list.is_empty() {
1019            Vec::new()
1020        } else {
1021            let conn4 = self.conn()?;
1022            let result = conn4
1023                .query(&format!(
1024                    "MATCH (s:{nt})-[e:{et}]->(d:{nt}) \
1025                     WHERE s.id IN [{ids_str}] AND d.id IN [{ids_str}] \
1026                     RETURN s.id, d.id, e.kind"
1027                ))
1028                .map_err(|e| GitCortexError::Store(e.to_string()))?;
1029            let mut edges = Vec::new();
1030            for row in result {
1031                let src_str = str_val(&row[0])?;
1032                let dst_str = str_val(&row[1])?;
1033                let kind_str = str_val(&row[2])?;
1034                edges.push(Edge {
1035                    src: NodeId::try_from(src_str.as_str())
1036                        .map_err(|e| GitCortexError::Store(format!("bad src id: {e}")))?,
1037                    dst: NodeId::try_from(dst_str.as_str())
1038                        .map_err(|e| GitCortexError::Store(format!("bad dst id: {e}")))?,
1039                    kind: edge_kind_from_str(&kind_str),
1040                });
1041            }
1042            edges
1043        };
1044
1045        Ok(SubGraph {
1046            nodes: all_nodes,
1047            edges: all_edges,
1048        })
1049    }
1050
1051    // ── Indexing state ────────────────────────────────────────────────────────
1052
1053    fn last_indexed_sha(&self, branch_name: &str) -> Result<Option<String>> {
1054        branch::read_last_sha(&self.repo_id, branch_name)
1055    }
1056
1057    fn set_last_indexed_sha(&mut self, branch_name: &str, sha: &str) -> Result<()> {
1058        branch::write_last_sha(&self.repo_id, branch_name, sha)
1059    }
1060}