Skip to main content

gitcortex_store/kuzu/
mod.rs

1use std::{
2    collections::{HashMap, HashSet},
3    path::Path,
4};
5
6use gitcortex_core::{
7    error::{GitCortexError, Result},
8    graph::{Edge, GraphDiff, Node, NodeId},
9    schema::{NodeKind, SCHEMA_VERSION},
10    store::{CallersDeep, GraphStore, SubGraph, SymbolContext},
11};
12use kuzu::{Connection, Database, SystemConfig};
13
14use crate::{branch, schema as db_schema};
15
16mod bulk;
17mod conv;
18mod escape;
19mod queries;
20mod values;
21
22use conv::{edge_kind_from_str, lang_scope_clause, vis_str};
23use escape::{esc, esc_multiline};
24use queries::{collect_ids, rows_to_nodes, NODE_COLS, SYMBOL_RANK};
25use values::str_val;
26
27// Batch sizes for `UNWIND`-based inserts. Nodes carry a (≤16 KB) def_body, so
28// their chunk is kept small to bound query-string size; edges are three ids
29// each, so they batch much larger.
30const NODE_INSERT_CHUNK: usize = 128;
31const EDGE_INSERT_CHUNK: usize = 1000;
32
33/// Render a `Node` as a Cypher struct literal `{id:'…', kind:'…', …}` for use
34/// inside an `UNWIND [...] AS r CREATE` batch. String fields are escaped and
35/// single-quoted; bools/ints are emitted bare.
36fn node_struct_literal(node: &Node) -> String {
37    let id = esc(&node.id.as_str());
38    let kind = esc(&node.kind.to_string());
39    let name = esc(&node.name);
40    let qname = esc(&node.qualified_name);
41    let file = esc(node.file.to_string_lossy().as_ref());
42    let sl = node.span.start_line as i64;
43    let el = node.span.end_line as i64;
44    let loc = node.metadata.loc as i64;
45    let vis = esc(&vis_str(&node.metadata.visibility));
46    let m = &node.metadata;
47    let generic_bounds = esc(&m.generic_bounds.join("|"));
48    let def_sig = esc_multiline(&m.definition.signature);
49    let def_body = esc_multiline(&m.definition.body);
50    let def_doc = esc_multiline(m.definition.doc_comment.as_deref().unwrap_or(""));
51    let def_start_byte = m.definition.start_byte as i64;
52    let def_end_byte = m.definition.end_byte as i64;
53
54    format!(
55        "{{id:'{id}', kind:'{kind}', name:'{name}', qualified_name:'{qname}', file:'{file}', \
56         start_line:{sl}, end_line:{el}, loc:{loc}, visibility:'{vis}', \
57         is_async:{ia}, is_unsafe:{iu}, is_static:{ist}, is_abstract:{iab}, is_final:{ifi}, \
58         is_property:{ip}, is_generator:{ig}, is_const:{ic}, generic_bounds:'{generic_bounds}', \
59         def_signature:'{def_sig}', def_body:'{def_body}', def_doc:'{def_doc}', \
60         def_start_byte:{def_start_byte}, def_end_byte:{def_end_byte}}}",
61        ia = m.is_async,
62        iu = m.is_unsafe,
63        ist = m.is_static,
64        iab = m.is_abstract,
65        ifi = m.is_final,
66        ip = m.is_property,
67        ig = m.is_generator,
68        ic = m.is_const,
69    )
70}
71
72/// True when the branch's node table has zero rows (fresh / never indexed).
73fn node_table_is_empty(conn: &Connection, nt: &str) -> Result<bool> {
74    let mut r = conn
75        .query(&format!("MATCH (n:{nt}) RETURN count(n) AS c LIMIT 1"))
76        .map_err(|e| GitCortexError::Store(format!("count nodes: {e}")))?;
77    match r.by_ref().next() {
78        Some(row) => match &row[0] {
79            kuzu::Value::Int64(n) => Ok(*n == 0),
80            _ => Ok(false),
81        },
82        None => Ok(true),
83    }
84}
85
86/// Bulk-load a full-index diff via CSV `COPY`. Stages CSVs in a unique temp
87/// dir, loads them, then removes the dir. See [`bulk`] for the rationale.
88fn bulk_apply(conn: &Connection, nt: &str, et: &str, diff: &GraphDiff) -> Result<()> {
89    // Unique staging dir per call: pid + nanos + a process-wide atomic counter,
90    // so concurrent `apply_diff`s (e.g. parallel tests in one binary) never
91    // share a directory.
92    use std::sync::atomic::{AtomicU64, Ordering};
93    static SEQ: AtomicU64 = AtomicU64::new(0);
94    let stage = std::env::temp_dir().join(format!(
95        "gcx-bulk-{}-{}-{}",
96        std::process::id(),
97        std::time::SystemTime::now()
98            .duration_since(std::time::UNIX_EPOCH)
99            .map(|d| d.as_nanos())
100            .unwrap_or(0),
101        SEQ.fetch_add(1, Ordering::Relaxed),
102    ));
103    std::fs::create_dir_all(&stage)
104        .map_err(|e| GitCortexError::Store(format!("create staging dir: {e}")))?;
105
106    let result = bulk::bulk_load(conn, nt, et, &stage, &diff.added_nodes, &diff.added_edges);
107
108    // Best-effort cleanup regardless of load outcome.
109    let _ = std::fs::remove_dir_all(&stage);
110
111    result.map(|_| ())
112}
113
114// ── KuzuGraphStore ────────────────────────────────────────────────────────────
115
116/// Local KuzuDB-backed implementation of [`GraphStore`].
117///
118/// One database file per repo (`graph.kuzu`), with per-branch node/edge tables
119/// inside it. A fresh `Connection` is created for each operation so we avoid
120/// the self-referential lifetime that `Mutex<Connection<'db>>` would require.
121pub struct KuzuGraphStore {
122    db: Database,
123    repo_id: String,
124}
125
126impl KuzuGraphStore {
127    /// Open (or create) the graph database for the repo at `repo_root`.
128    ///
129    /// If the persisted schema version doesn't match [`SCHEMA_VERSION`], the
130    /// entire repo data directory is wiped so a fresh full index runs on next
131    /// hook invocation.
132    pub fn open(repo_root: &Path) -> Result<Self> {
133        let repo_id = branch::repo_id(repo_root);
134
135        if branch::read_schema_version(&repo_id) != SCHEMA_VERSION {
136            eprintln!(
137                "gitcortex: schema version mismatch (expected {}); wiping graph store for re-index",
138                SCHEMA_VERSION
139            );
140            branch::wipe_repo_data(&repo_id);
141            branch::write_schema_version(&repo_id, SCHEMA_VERSION)?;
142        }
143
144        let db_path = branch::db_path(&repo_id);
145        if let Some(parent) = db_path.parent() {
146            std::fs::create_dir_all(parent)?;
147        }
148
149        let db = Database::new(&db_path, SystemConfig::default())
150            .map_err(|e| GitCortexError::Store(format!("open db: {e}")))?;
151
152        Ok(Self { db, repo_id })
153    }
154
155    // ── Private helpers ───────────────────────────────────────────────────────
156
157    fn conn(&self) -> Result<Connection<'_>> {
158        Connection::new(&self.db)
159            .map_err(|e| GitCortexError::Store(format!("open connection: {e}")))
160    }
161
162    fn ensure_branch(&self, branch: &str) -> Result<()> {
163        let mut conn = self.conn()?;
164        db_schema::ensure_branch(&mut conn, branch)
165    }
166}
167
168// ── GraphStore impl ───────────────────────────────────────────────────────────
169
170impl GraphStore for KuzuGraphStore {
171    // ── Write path ────────────────────────────────────────────────────────────
172
173    fn apply_diff(&mut self, branch: &str, diff: &GraphDiff) -> Result<()> {
174        if diff.is_empty() {
175            return Ok(());
176        }
177
178        self.ensure_branch(branch)?;
179        let nt = db_schema::node_table(branch);
180        let et = db_schema::edge_table(branch);
181        let conn = self.conn()?;
182
183        // ── Fast path: bulk COPY load for a fresh full index ───────────────────
184        // When the branch's node table is empty this is a first full index.
185        // Stage the nodes/edges as CSV and `COPY` them in — ~100× faster than
186        // per-row MATCH/CREATE on large repos.
187        //
188        // The diff's `removed_*` fields are ignored on this path: the indexer
189        // emits a `removed_files` entry for every parsed file + its ancestor
190        // folders (so an incremental re-parse first clears the old nodes), but
191        // against an empty table those deletes are vacuous. Deferred cross-file
192        // resolution is likewise skipped — on a full index every in-repo name
193        // is already in `added_edges`; the only `deferred_*` left are external
194        // (stdlib) names the store couldn't resolve anyway.
195        let empty = node_table_is_empty(&conn, &nt)?;
196        if std::env::var_os("GCX_TIMING").is_some() {
197            eprintln!(
198                "[gcx-timing] apply_diff path: table_empty={empty} nodes={} edges={}",
199                diff.added_nodes.len(),
200                diff.added_edges.len()
201            );
202        }
203        if empty {
204            return bulk_apply(&conn, &nt, &et, diff);
205        }
206
207        // Transaction 1: commit all deletes first.
208        // KuzuDB has a quirk where DETACH DELETE + CREATE in the same transaction
209        // can produce NULL for the last STRING column in newly created nodes.
210        // Splitting into separate transactions avoids this.
211        conn.query("BEGIN TRANSACTION")
212            .map_err(|e| GitCortexError::Store(format!("begin delete transaction: {e}")))?;
213
214        // 1. Remove nodes for deleted/replaced files.
215        //    Skip directory paths (no extension) — folder nodes are reused across
216        //    incremental updates to preserve their Contains edges to sibling files.
217        for file in &diff.removed_files {
218            if file.extension().is_none() {
219                continue;
220            }
221            let file_str = esc(file.to_string_lossy().as_ref());
222            conn.query(&format!(
223                "MATCH (n:{nt}) WHERE n.file = '{file_str}' DETACH DELETE n"
224            ))
225            .map_err(|e| GitCortexError::Store(format!("delete file nodes: {e}")))?;
226        }
227
228        // 2. Remove explicit node IDs.
229        for id in &diff.removed_node_ids {
230            let id_str = esc(&id.as_str());
231            conn.query(&format!(
232                "MATCH (n:{nt}) WHERE n.id = '{id_str}' DETACH DELETE n"
233            ))
234            .map_err(|e| GitCortexError::Store(format!("delete node: {e}")))?;
235        }
236
237        // 3. Remove explicit edges.
238        for (src, dst, kind) in &diff.removed_edges {
239            let s = esc(&src.as_str());
240            let d = esc(&dst.as_str());
241            let k = esc(&kind.to_string());
242            conn.query(&format!(
243                "MATCH (s:{nt})-[e:{et}]->(d:{nt}) \
244                 WHERE s.id = '{s}' AND d.id = '{d}' AND e.kind = '{k}' \
245                 DELETE e"
246            ))
247            .map_err(|e| GitCortexError::Store(format!("delete edge: {e}")))?;
248        }
249
250        conn.query("COMMIT")
251            .map_err(|e| GitCortexError::Store(format!("commit deletes: {e}")))?;
252
253        // Build a remap table: for each Folder node in the diff, if a folder at
254        // that path already exists in the DB, reuse its ID so that existing
255        // Contains edges to sibling files are preserved.
256        // Use the same connection (no open transaction between tx1 COMMIT and tx2 BEGIN).
257        let mut id_remap: HashMap<String, String> = HashMap::new();
258        for node in diff
259            .added_nodes
260            .iter()
261            .filter(|n| n.kind == NodeKind::Folder)
262        {
263            let path_esc = esc(node.file.to_string_lossy().as_ref());
264            let mut check = conn
265                .query(&format!(
266                    "MATCH (n:{nt}) WHERE n.file = '{path_esc}' AND n.kind = 'folder' \
267                     RETURN n.id LIMIT 1"
268                ))
269                .map_err(|e| GitCortexError::Store(e.to_string()))?;
270            if let Some(row) = check.by_ref().next() {
271                if let Ok(existing_id) = str_val(&row[0]) {
272                    tracing::debug!("folder remap: {} → {}", node.file.display(), existing_id);
273                    id_remap.insert(node.id.as_str().to_owned(), existing_id);
274                }
275            }
276        }
277
278        // Transaction 2: insert new nodes. Deduplicate by ID first so a rename
279        // delta (or any other case producing the same NodeId twice) never hits a
280        // PK violation. Folder nodes remapped to existing DB nodes are skipped.
281        conn.query("BEGIN TRANSACTION")
282            .map_err(|e| GitCortexError::Store(format!("begin node insert transaction: {e}")))?;
283
284        // Batch node inserts via `UNWIND [<struct>, …] CREATE`. One query per
285        // chunk instead of one per node — a ~100× cut in round-trips on a full
286        // index of a large repo. Chunk size is kept modest because each row
287        // carries the (truncated) def_body, so a chunk can still be a few MB.
288        let mut seen_node_ids: HashSet<String> = HashSet::new();
289        let rows: Vec<String> = diff
290            .added_nodes
291            .iter()
292            .filter(|n| seen_node_ids.insert(n.id.as_str().to_owned()))
293            // Folder node remapped to an existing DB node — skip INSERT.
294            .filter(|n| !id_remap.contains_key(&n.id.as_str()))
295            .map(node_struct_literal)
296            .collect();
297
298        for chunk in rows.chunks(NODE_INSERT_CHUNK) {
299            let list = chunk.join(", ");
300            conn.query(&format!(
301                "UNWIND [{list}] AS r \
302                 CREATE (:{nt} {{\
303                    id: r.id, kind: r.kind, name: r.name, \
304                    qualified_name: r.qualified_name, file: r.file, \
305                    start_line: r.start_line, end_line: r.end_line, loc: r.loc, \
306                    visibility: r.visibility, is_async: r.is_async, is_unsafe: r.is_unsafe, \
307                    is_static: r.is_static, is_abstract: r.is_abstract, is_final: r.is_final, \
308                    is_property: r.is_property, is_generator: r.is_generator, is_const: r.is_const, \
309                    generic_bounds: r.generic_bounds, \
310                    def_signature: r.def_signature, def_body: r.def_body, def_doc: r.def_doc, \
311                    def_start_byte: r.def_start_byte, def_end_byte: r.def_end_byte\
312                 }})"
313            ))
314            .map_err(|e| GitCortexError::Store(format!("batch insert nodes: {e}")))?;
315        }
316
317        // Commit node inserts so the edge MATCH queries in step 3 see them.
318        conn.query("COMMIT")
319            .map_err(|e| GitCortexError::Store(format!("commit nodes: {e}")))?;
320
321        // Transaction 3: insert edges and resolve deferred references.
322        conn.query("BEGIN TRANSACTION")
323            .map_err(|e| GitCortexError::Store(format!("begin edge transaction: {e}")))?;
324
325        // 4. Insert new edges. Deduplicate by (src,dst,kind) to avoid creating
326        //    parallel edges. Remap folder IDs to existing DB nodes where applicable.
327        //    MATCH yields nothing for missing endpoints → skip silently.
328        let mut seen_edges: HashSet<(String, String, String)> = HashSet::new();
329        let edge_rows: Vec<String> = diff
330            .added_edges
331            .iter()
332            .filter(|e| {
333                seen_edges.insert((
334                    e.src.as_str().to_owned(),
335                    e.dst.as_str().to_owned(),
336                    e.kind.to_string(),
337                ))
338            })
339            .map(|edge| {
340                let src_raw = edge.src.as_str();
341                let dst_raw = edge.dst.as_str();
342                let s = esc(id_remap
343                    .get(&src_raw)
344                    .map(String::as_str)
345                    .unwrap_or(&src_raw));
346                let d = esc(id_remap
347                    .get(&dst_raw)
348                    .map(String::as_str)
349                    .unwrap_or(&dst_raw));
350                let k = esc(&edge.kind.to_string());
351                format!("{{s:'{s}', d:'{d}', k:'{k}'}}")
352            })
353            .collect();
354
355        // Batch edge inserts via `UNWIND … MATCH … CREATE`. Edge rows are tiny
356        // (three ids), so a larger chunk than nodes is fine. Endpoints missing
357        // from the store yield no MATCH row and are skipped silently — same
358        // semantics as the per-edge version.
359        for chunk in edge_rows.chunks(EDGE_INSERT_CHUNK) {
360            let list = chunk.join(", ");
361            conn.query(&format!(
362                "UNWIND [{list}] AS r \
363                 MATCH (s:{nt} {{id: r.s}}), (d:{nt} {{id: r.d}}) \
364                 CREATE (s)-[:{et} {{kind: r.k}}]->(d)"
365            ))
366            .map_err(|e| GitCortexError::Store(format!("batch insert edges: {e}")))?;
367        }
368
369        // 6. Resolve cross-file deferred edges against the full store.
370        //    The diff-local pass couldn't find these callees/types because they
371        //    live in unchanged files. We match by name here — best-effort without
372        //    full type inference, filtered to the correct node kinds to reduce noise.
373        //
374        //    Scoping: the resolved candidate must live in a file whose extension
375        //    belongs to the same language family as the caller. Without this a
376        //    Rust `welcome()` would resolve to a Python `polite_greet()` simply
377        //    because they share a name. The caller-file map is built once from
378        //    `added_nodes` (all deferred caller ids are diff-local nodes).
379        let caller_file: HashMap<String, String> = diff
380            .added_nodes
381            .iter()
382            .map(|n| {
383                (
384                    n.id.as_str().to_owned(),
385                    n.file.to_string_lossy().into_owned(),
386                )
387            })
388            .collect();
389
390        for (caller_id, callee_name) in &diff.deferred_calls {
391            let caller_id_str = caller_id.as_str();
392            let caller = esc(&caller_id_str);
393            let callee = esc(callee_name);
394            let scope = caller_file
395                .get(&caller_id_str)
396                .map(|f| lang_scope_clause(f, "callee"))
397                .unwrap_or_default();
398            conn.query(&format!(
399                "MATCH (caller:{nt} {{id: '{caller}'}}), (callee:{nt}) \
400                 WHERE callee.name = '{callee}' \
401                 AND (callee.kind = 'function' OR callee.kind = 'method'){scope} \
402                 CREATE (caller)-[:{et} {{kind: 'calls'}}]->(callee)"
403            ))
404            .map_err(|e| GitCortexError::Store(format!("deferred call '{callee_name}': {e}")))?;
405        }
406
407        for (fn_id, type_name) in &diff.deferred_uses {
408            let fn_id_str = fn_id.as_str();
409            let fn_esc = esc(&fn_id_str);
410            let ty = esc(type_name);
411            let scope = caller_file
412                .get(&fn_id_str)
413                .map(|f| lang_scope_clause(f, "ty"))
414                .unwrap_or_default();
415            conn.query(&format!(
416                "MATCH (fn_node:{nt} {{id: '{fn_esc}'}}), (ty:{nt}) \
417                 WHERE ty.name = '{ty}' \
418                 AND (ty.kind = 'struct' OR ty.kind = 'enum' \
419                      OR ty.kind = 'trait' OR ty.kind = 'type_alias'){scope} \
420                 CREATE (fn_node)-[:{et} {{kind: 'uses'}}]->(ty)"
421            ))
422            .map_err(|e| GitCortexError::Store(format!("deferred use '{type_name}': {e}")))?;
423        }
424
425        for (struct_id, trait_name) in &diff.deferred_implements {
426            let sid = struct_id.as_str();
427            let s = esc(&sid);
428            let t = esc(trait_name);
429            let scope = caller_file
430                .get(&sid)
431                .map(|f| lang_scope_clause(f, "tr"))
432                .unwrap_or_default();
433            conn.query(&format!(
434                "MATCH (st:{nt} {{id: '{s}'}}), (tr:{nt}) \
435                 WHERE tr.name = '{t}' AND (tr.kind = 'trait' OR tr.kind = 'interface'){scope} \
436                 CREATE (st)-[:{et} {{kind: 'implements'}}]->(tr)"
437            ))
438            .map_err(|e| GitCortexError::Store(format!("deferred impl '{trait_name}': {e}")))?;
439        }
440
441        for (subtype_id, supertype_name) in &diff.deferred_inherits {
442            let sid = subtype_id.as_str();
443            let s = esc(&sid);
444            let t = esc(supertype_name);
445            let scope = caller_file
446                .get(&sid)
447                .map(|f| lang_scope_clause(f, "sup"))
448                .unwrap_or_default();
449            conn.query(&format!(
450                "MATCH (sub:{nt} {{id: '{s}'}}), (sup:{nt}) \
451                 WHERE sup.name = '{t}' \
452                 AND (sup.kind = 'struct' OR sup.kind = 'interface' OR sup.kind = 'trait'){scope} \
453                 CREATE (sub)-[:{et} {{kind: 'inherits'}}]->(sup)"
454            ))
455            .map_err(|e| {
456                GitCortexError::Store(format!("deferred inherits '{supertype_name}': {e}"))
457            })?;
458        }
459
460        for (method_id, exception_name) in &diff.deferred_throws {
461            let mid = method_id.as_str();
462            let m = esc(&mid);
463            let e_name = esc(exception_name);
464            let scope = caller_file
465                .get(&mid)
466                .map(|f| lang_scope_clause(f, "ex"))
467                .unwrap_or_default();
468            conn.query(&format!(
469                "MATCH (m:{nt} {{id: '{m}'}}), (ex:{nt}) \
470                 WHERE ex.name = '{e_name}'{scope} \
471                 CREATE (m)-[:{et} {{kind: 'throws'}}]->(ex)"
472            ))
473            .map_err(|e| {
474                GitCortexError::Store(format!("deferred throws '{exception_name}': {e}"))
475            })?;
476        }
477
478        for (target_id, annotation_name) in &diff.deferred_annotated {
479            let tid = target_id.as_str();
480            let t = esc(&tid);
481            let a = esc(annotation_name);
482            let scope = caller_file
483                .get(&tid)
484                .map(|f| lang_scope_clause(f, "ann"))
485                .unwrap_or_default();
486            conn.query(&format!(
487                "MATCH (target:{nt} {{id: '{t}'}}), (ann:{nt}) \
488                 WHERE ann.name = '{a}' \
489                 AND (ann.kind = 'annotation' OR ann.kind = 'macro' OR ann.kind = 'function'){scope} \
490                 CREATE (target)-[:{et} {{kind: 'annotated'}}]->(ann)"
491            ))
492            .map_err(|e| {
493                GitCortexError::Store(format!("deferred annotated '{annotation_name}': {e}"))
494            })?;
495        }
496
497        conn.query("COMMIT")
498            .map_err(|e| GitCortexError::Store(format!("commit edges: {e}")))?;
499
500        Ok(())
501    }
502
503    // ── Read path ─────────────────────────────────────────────────────────────
504
505    fn lookup_symbol(&self, branch: &str, name: &str, fuzzy: bool) -> Result<Vec<Node>> {
506        self.ensure_branch(branch)?;
507        let nt = db_schema::node_table(branch);
508        let name_esc = esc(name);
509        let conn = self.conn()?;
510
511        let condition = if fuzzy {
512            format!("contains(n.name, '{name_esc}')")
513        } else {
514            format!("n.name = '{name_esc}'")
515        };
516
517        let mut result = conn
518            .query(&format!(
519                "MATCH (n:{nt}) WHERE {condition} RETURN {NODE_COLS} ORDER BY {SYMBOL_RANK}"
520            ))
521            .map_err(|e| GitCortexError::Store(e.to_string()))?;
522
523        rows_to_nodes(&mut result)
524    }
525
526    fn find_callers(&self, branch: &str, function_name: &str) -> Result<Vec<Node>> {
527        self.ensure_branch(branch)?;
528        let nt = db_schema::node_table(branch);
529        let et = db_schema::edge_table(branch);
530        let name_esc = esc(function_name);
531        let conn = self.conn()?;
532
533        let mut result = conn
534            .query(&format!(
535                "MATCH (n:{nt})-[:{et} {{kind: 'calls'}}]->(callee:{nt}) \
536                 WHERE callee.name = '{name_esc}' \
537                 RETURN DISTINCT {NODE_COLS}"
538            ))
539            .map_err(|e| GitCortexError::Store(e.to_string()))?;
540
541        rows_to_nodes(&mut result)
542    }
543
544    fn find_callers_deep(
545        &self,
546        branch: &str,
547        function_name: &str,
548        depth: u8,
549    ) -> Result<CallersDeep> {
550        let depth = depth.min(5);
551        let mut hops: Vec<Vec<Node>> = Vec::new();
552        // Track seen node IDs to avoid cycles.
553        let mut seen: HashSet<String> = HashSet::new();
554        // The frontier holds the *names* of nodes whose callers we search next.
555        let mut frontier: Vec<String> = vec![function_name.to_owned()];
556        seen.insert(function_name.to_owned());
557
558        for _ in 0..depth {
559            if frontier.is_empty() {
560                break;
561            }
562            let mut hop_nodes: Vec<Node> = Vec::new();
563            let mut next_frontier: Vec<String> = Vec::new();
564            for target in &frontier {
565                for caller in self.find_callers(branch, target)? {
566                    let id = caller.id.as_str().to_owned();
567                    if seen.insert(id) {
568                        next_frontier.push(caller.name.clone());
569                        hop_nodes.push(caller);
570                    }
571                }
572            }
573            hops.push(hop_nodes);
574            frontier = next_frontier;
575        }
576
577        let total_affected: usize = hops.iter().map(|h| h.len()).sum();
578        let risk_level = match total_affected {
579            0..=2 => "LOW",
580            3..=10 => "MEDIUM",
581            11..=30 => "HIGH",
582            _ => "CRITICAL",
583        };
584
585        Ok(CallersDeep { hops, risk_level })
586    }
587
588    fn symbol_context(&self, branch: &str, name: &str) -> Result<SymbolContext> {
589        self.ensure_branch(branch)?;
590        let nt = db_schema::node_table(branch);
591        let et = db_schema::edge_table(branch);
592        let name_esc = esc(name);
593        let conn = self.conn()?;
594
595        // Definition — best match by kind priority (type decl > fn/method >
596        // … > module/file), so `wiki Echo` resolves to `type Echo` not a
597        // same-named method.
598        let mut def_result = conn
599            .query(&format!(
600                "MATCH (n:{nt}) WHERE n.name = '{name_esc}' \
601                 RETURN {NODE_COLS} ORDER BY {SYMBOL_RANK} LIMIT 1"
602            ))
603            .map_err(|e| GitCortexError::Store(e.to_string()))?;
604        let mut defs = rows_to_nodes(&mut def_result)?;
605        if defs.is_empty() {
606            return Err(GitCortexError::Store(format!(
607                "symbol '{name}' not found on branch '{branch}'"
608            )));
609        }
610        let definition = defs.remove(0);
611
612        // Scope callers/callees/used-by to THIS specific definition (by id),
613        // not by name. Otherwise a Java `welcome` would pull in callees from
614        // a Python `welcome` that happens to share the name. `find_callers`
615        // as a standalone tool remains name-based — callers without a specific
616        // definition node have no other handle.
617        let def_id = esc(&definition.id.as_str());
618
619        let mut caller_result = conn
620            .query(&format!(
621                "MATCH (n:{nt})-[:{et} {{kind: 'calls'}}]->(callee:{nt}) \
622                 WHERE callee.id = '{def_id}' \
623                 RETURN DISTINCT {NODE_COLS}"
624            ))
625            .map_err(|e| GitCortexError::Store(e.to_string()))?;
626        let callers = rows_to_nodes(&mut caller_result)?;
627
628        let mut callee_result = conn
629            .query(&format!(
630                "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
631                 WHERE caller.id = '{def_id}' \
632                 RETURN {NODE_COLS}"
633            ))
634            .map_err(|e| GitCortexError::Store(e.to_string()))?;
635        let callees = rows_to_nodes(&mut callee_result)?;
636
637        let mut used_result = conn
638            .query(&format!(
639                "MATCH (n:{nt})-[:{et} {{kind: 'uses'}}]->(ty:{nt}) \
640                 WHERE ty.id = '{def_id}' \
641                 RETURN {NODE_COLS}"
642            ))
643            .map_err(|e| GitCortexError::Store(e.to_string()))?;
644        let used_by = rows_to_nodes(&mut used_result)?;
645
646        Ok(SymbolContext {
647            definition,
648            callers,
649            callees,
650            used_by,
651        })
652    }
653
654    fn list_definitions(&self, branch: &str, file: &Path) -> Result<Vec<Node>> {
655        self.ensure_branch(branch)?;
656        let nt = db_schema::node_table(branch);
657        let file_esc = esc(file.to_string_lossy().as_ref());
658        let conn = self.conn()?;
659
660        let mut result = conn
661            .query(&format!(
662                "MATCH (n:{nt}) WHERE n.file = '{file_esc}' \
663                 RETURN {NODE_COLS} ORDER BY n.start_line"
664            ))
665            .map_err(|e| GitCortexError::Store(e.to_string()))?;
666
667        rows_to_nodes(&mut result)
668    }
669
670    fn branch_diff(&self, from: &str, to: &str) -> Result<GraphDiff> {
671        self.ensure_branch(from)?;
672        self.ensure_branch(to)?;
673
674        let from_nt = db_schema::node_table(from);
675        let to_nt = db_schema::node_table(to);
676        let mut conn = self.conn()?;
677
678        // Collect node IDs from each branch.
679        let from_ids = collect_ids(&mut conn, &from_nt)?;
680        let to_ids = collect_ids(&mut conn, &to_nt)?;
681
682        // Nodes in `to` but not in `from` → added.
683        let added_ids: Vec<&String> = to_ids.iter().filter(|id| !from_ids.contains(*id)).collect();
684
685        // Nodes in `from` but not in `to` → removed.
686        let removed_ids: Vec<&String> =
687            from_ids.iter().filter(|id| !to_ids.contains(*id)).collect();
688
689        let mut diff = GraphDiff::default();
690
691        for id in added_ids {
692            let id_esc = esc(id);
693            let mut r = conn
694                .query(&format!(
695                    "MATCH (n:{to_nt}) WHERE n.id = '{id_esc}' RETURN {NODE_COLS}"
696                ))
697                .map_err(|e| GitCortexError::Store(e.to_string()))?;
698            diff.added_nodes.extend(rows_to_nodes(&mut r)?);
699        }
700
701        for id in removed_ids {
702            if let Ok(node_id) = NodeId::try_from(id.as_str()) {
703                diff.removed_node_ids.push(node_id);
704            }
705        }
706
707        Ok(diff)
708    }
709
710    fn list_all_nodes(&self, branch: &str) -> Result<Vec<Node>> {
711        self.ensure_branch(branch)?;
712        let nt = db_schema::node_table(branch);
713        let conn = self.conn()?;
714        let mut result = conn
715            .query(&format!("MATCH (n:{nt}) RETURN {NODE_COLS}"))
716            .map_err(|e| GitCortexError::Store(e.to_string()))?;
717        rows_to_nodes(&mut result)
718    }
719
720    fn list_all_edges(&self, branch: &str) -> Result<Vec<Edge>> {
721        self.ensure_branch(branch)?;
722        let nt = db_schema::node_table(branch);
723        let et = db_schema::edge_table(branch);
724        let conn = self.conn()?;
725        let result = conn
726            .query(&format!(
727                "MATCH (s:{nt})-[e:{et}]->(d:{nt}) RETURN s.id, d.id, e.kind"
728            ))
729            .map_err(|e| GitCortexError::Store(e.to_string()))?;
730
731        let mut out = Vec::new();
732        for row in result {
733            let src_str = str_val(&row[0])?;
734            let dst_str = str_val(&row[1])?;
735            let kind_str = str_val(&row[2])?;
736            out.push(Edge {
737                src: NodeId::try_from(src_str.as_str())
738                    .map_err(|e| GitCortexError::Store(format!("bad src id: {e}")))?,
739                dst: NodeId::try_from(dst_str.as_str())
740                    .map_err(|e| GitCortexError::Store(format!("bad dst id: {e}")))?,
741                kind: edge_kind_from_str(&kind_str),
742            });
743        }
744        Ok(out)
745    }
746
747    fn find_callees(&self, branch: &str, function_name: &str, depth: u8) -> Result<CallersDeep> {
748        let depth = depth.min(5);
749        let mut hops: Vec<Vec<Node>> = Vec::new();
750        let mut seen: HashSet<String> = HashSet::new();
751        let mut frontier: Vec<String> = vec![function_name.to_owned()];
752        seen.insert(function_name.to_owned());
753
754        for _ in 0..depth {
755            if frontier.is_empty() {
756                break;
757            }
758            let mut hop_nodes: Vec<Node> = Vec::new();
759            let mut next_frontier: Vec<String> = Vec::new();
760            for caller_name in &frontier {
761                let nt = db_schema::node_table(branch);
762                let et = db_schema::edge_table(branch);
763                let name_esc = esc(caller_name);
764                let conn = self.conn()?;
765                let mut result = conn
766                    .query(&format!(
767                        "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
768                         WHERE caller.name = '{name_esc}' \
769                         RETURN {NODE_COLS}"
770                    ))
771                    .map_err(|e| GitCortexError::Store(e.to_string()))?;
772                for node in rows_to_nodes(&mut result)? {
773                    let id = node.id.as_str().to_owned();
774                    if seen.insert(id) {
775                        next_frontier.push(node.name.clone());
776                        hop_nodes.push(node);
777                    }
778                }
779            }
780            hops.push(hop_nodes);
781            frontier = next_frontier;
782        }
783
784        let total: usize = hops.iter().map(|h| h.len()).sum();
785        let risk_level = match total {
786            0..=2 => "LOW",
787            3..=10 => "MEDIUM",
788            11..=30 => "HIGH",
789            _ => "CRITICAL",
790        };
791        Ok(CallersDeep { hops, risk_level })
792    }
793
794    fn find_implementors(&self, branch: &str, trait_or_interface_name: &str) -> Result<Vec<Node>> {
795        self.ensure_branch(branch)?;
796        let nt = db_schema::node_table(branch);
797        let et = db_schema::edge_table(branch);
798        let name_esc = esc(trait_or_interface_name);
799        let conn = self.conn()?;
800        let mut result = conn
801            .query(&format!(
802                "MATCH (n:{nt})-[e:{et}]->(trait_node:{nt}) \
803                 WHERE trait_node.name = '{name_esc}' \
804                 AND (e.kind = 'implements' OR e.kind = 'inherits') \
805                 RETURN DISTINCT {NODE_COLS} ORDER BY {SYMBOL_RANK}"
806            ))
807            .map_err(|e| GitCortexError::Store(e.to_string()))?;
808        rows_to_nodes(&mut result)
809    }
810
811    fn trace_path(&self, branch: &str, from: &str, to: &str) -> Result<Vec<Node>> {
812        self.ensure_branch(branch)?;
813        let nt = db_schema::node_table(branch);
814        let et = db_schema::edge_table(branch);
815
816        // BFS from `from` to `to` following Calls edges.
817        let from_esc = esc(from);
818        let conn = self.conn()?;
819        let mut start_result = conn
820            .query(&format!(
821                "MATCH (n:{nt}) WHERE n.name = '{from_esc}' RETURN {NODE_COLS} LIMIT 1"
822            ))
823            .map_err(|e| GitCortexError::Store(e.to_string()))?;
824        let start_nodes = rows_to_nodes(&mut start_result)?;
825        if start_nodes.is_empty() {
826            return Ok(Vec::new());
827        }
828
829        // BFS: queue of (current_name, path_so_far)
830        let mut queue: std::collections::VecDeque<(String, Vec<String>)> =
831            std::collections::VecDeque::new();
832        queue.push_back((from.to_owned(), vec![from.to_owned()]));
833        let mut visited: HashSet<String> = HashSet::new();
834        visited.insert(from.to_owned());
835
836        const MAX_HOPS: usize = 6;
837        while let Some((current, path)) = queue.pop_front() {
838            if path.len() > MAX_HOPS {
839                continue;
840            }
841            let cur_esc = esc(&current);
842            let conn2 = self.conn()?;
843            let mut callee_result = conn2
844                .query(&format!(
845                    "MATCH (caller:{nt})-[:{et} {{kind: 'calls'}}]->(n:{nt}) \
846                     WHERE caller.name = '{cur_esc}' \
847                     RETURN {NODE_COLS}"
848                ))
849                .map_err(|e| GitCortexError::Store(e.to_string()))?;
850            for node in rows_to_nodes(&mut callee_result)? {
851                let node_name = node.name.clone();
852                if node_name == to {
853                    // Found — resolve full path names to nodes
854                    let mut result_nodes = Vec::new();
855                    for name in &path {
856                        let conn3 = self.conn()?;
857                        let n_esc = esc(name);
858                        let mut r = conn3
859                            .query(&format!(
860                                "MATCH (n:{nt}) WHERE n.name = '{n_esc}' RETURN {NODE_COLS} LIMIT 1"
861                            ))
862                            .map_err(|e| GitCortexError::Store(e.to_string()))?;
863                        result_nodes.extend(rows_to_nodes(&mut r)?);
864                    }
865                    result_nodes.push(node);
866                    return Ok(result_nodes);
867                }
868                if visited.insert(node_name.clone()) {
869                    let mut new_path = path.clone();
870                    new_path.push(node_name.clone());
871                    queue.push_back((node_name, new_path));
872                }
873            }
874        }
875        Ok(Vec::new())
876    }
877
878    fn list_symbols_in_range(
879        &self,
880        branch: &str,
881        file: &Path,
882        start_line: u32,
883        end_line: u32,
884    ) -> Result<Vec<Node>> {
885        self.ensure_branch(branch)?;
886        let nt = db_schema::node_table(branch);
887        let file_esc = esc(file.to_string_lossy().as_ref());
888        let conn = self.conn()?;
889
890        let mut result = conn
891            .query(&format!(
892                "MATCH (n:{nt}) \
893                 WHERE n.file = '{file_esc}' \
894                 AND n.start_line <= {end_line} \
895                 AND n.end_line >= {start_line} \
896                 RETURN {NODE_COLS} ORDER BY n.start_line"
897            ))
898            .map_err(|e| GitCortexError::Store(e.to_string()))?;
899
900        rows_to_nodes(&mut result)
901    }
902
903    fn find_unused_symbols(&self, branch: &str, kind: Option<NodeKind>) -> Result<Vec<Node>> {
904        self.ensure_branch(branch)?;
905        let nt = db_schema::node_table(branch);
906        let et = db_schema::edge_table(branch);
907        let conn = self.conn()?;
908
909        let kind_filter = match &kind {
910            Some(k) => format!("AND n.kind = '{k}'"),
911            None => String::new(),
912        };
913
914        let mut result = conn
915            .query(&format!(
916                "MATCH (n:{nt}) \
917                 WHERE NOT EXISTS {{ MATCH (:{nt})-[:{et} {{kind: 'calls'}}]->(n) }} \
918                 AND NOT EXISTS {{ MATCH (:{nt})-[:{et} {{kind: 'uses'}}]->(n) }} \
919                 AND n.kind <> 'file' AND n.kind <> 'folder' AND n.kind <> 'module' \
920                 {kind_filter} \
921                 RETURN {NODE_COLS} ORDER BY n.file, n.start_line"
922            ))
923            .map_err(|e| GitCortexError::Store(e.to_string()))?;
924
925        rows_to_nodes(&mut result)
926    }
927
928    fn get_subgraph(
929        &self,
930        branch: &str,
931        seed_name: &str,
932        depth: u8,
933        direction: &str,
934    ) -> Result<SubGraph> {
935        self.ensure_branch(branch)?;
936        let depth = depth.min(5);
937        let nt = db_schema::node_table(branch);
938        let et = db_schema::edge_table(branch);
939
940        let seed_esc = esc(seed_name);
941        let conn = self.conn()?;
942        let mut seed_result = conn
943            .query(&format!(
944                "MATCH (n:{nt}) WHERE n.name = '{seed_esc}' RETURN {NODE_COLS} LIMIT 1"
945            ))
946            .map_err(|e| GitCortexError::Store(e.to_string()))?;
947        let seed_nodes = rows_to_nodes(&mut seed_result)?;
948        if seed_nodes.is_empty() {
949            return Ok(SubGraph {
950                nodes: Vec::new(),
951                edges: Vec::new(),
952            });
953        }
954
955        let mut all_node_ids: HashSet<String> = HashSet::new();
956        let mut all_nodes: Vec<Node> = Vec::new();
957        let mut frontier_names: Vec<String> = vec![seed_name.to_owned()];
958
959        for node in seed_nodes {
960            all_node_ids.insert(node.id.as_str().to_owned());
961            all_nodes.push(node);
962        }
963
964        for _ in 0..depth {
965            let mut next_frontier: Vec<String> = Vec::new();
966            for name in &frontier_names {
967                let name_esc = esc(name);
968                // Outbound (callees): what this node calls
969                if direction == "out" || direction == "both" {
970                    let conn2 = self.conn()?;
971                    let mut r = conn2
972                        .query(&format!(
973                            "MATCH (caller:{nt})-[:{et}]->(n:{nt}) \
974                             WHERE caller.name = '{name_esc}' \
975                             RETURN {NODE_COLS}"
976                        ))
977                        .map_err(|e| GitCortexError::Store(e.to_string()))?;
978                    for node in rows_to_nodes(&mut r)? {
979                        let id = node.id.as_str().to_owned();
980                        if all_node_ids.insert(id) {
981                            next_frontier.push(node.name.clone());
982                            all_nodes.push(node);
983                        }
984                    }
985                }
986                // Inbound (callers): what calls this node
987                if direction == "in" || direction == "both" {
988                    let conn3 = self.conn()?;
989                    let mut r = conn3
990                        .query(&format!(
991                            "MATCH (n:{nt})-[:{et}]->(target:{nt}) \
992                             WHERE target.name = '{name_esc}' \
993                             RETURN {NODE_COLS}"
994                        ))
995                        .map_err(|e| GitCortexError::Store(e.to_string()))?;
996                    for node in rows_to_nodes(&mut r)? {
997                        let id = node.id.as_str().to_owned();
998                        if all_node_ids.insert(id) {
999                            next_frontier.push(node.name.clone());
1000                            all_nodes.push(node);
1001                        }
1002                    }
1003                }
1004            }
1005            if next_frontier.is_empty() {
1006                break;
1007            }
1008            frontier_names = next_frontier;
1009        }
1010
1011        // Collect edges between the nodes in the subgraph
1012        let ids_list: Vec<String> = all_node_ids
1013            .iter()
1014            .map(|id| format!("'{}'", esc(id)))
1015            .collect();
1016        let ids_str = ids_list.join(", ");
1017        let all_edges = if ids_list.is_empty() {
1018            Vec::new()
1019        } else {
1020            let conn4 = self.conn()?;
1021            let result = conn4
1022                .query(&format!(
1023                    "MATCH (s:{nt})-[e:{et}]->(d:{nt}) \
1024                     WHERE s.id IN [{ids_str}] AND d.id IN [{ids_str}] \
1025                     RETURN s.id, d.id, e.kind"
1026                ))
1027                .map_err(|e| GitCortexError::Store(e.to_string()))?;
1028            let mut edges = Vec::new();
1029            for row in result {
1030                let src_str = str_val(&row[0])?;
1031                let dst_str = str_val(&row[1])?;
1032                let kind_str = str_val(&row[2])?;
1033                edges.push(Edge {
1034                    src: NodeId::try_from(src_str.as_str())
1035                        .map_err(|e| GitCortexError::Store(format!("bad src id: {e}")))?,
1036                    dst: NodeId::try_from(dst_str.as_str())
1037                        .map_err(|e| GitCortexError::Store(format!("bad dst id: {e}")))?,
1038                    kind: edge_kind_from_str(&kind_str),
1039                });
1040            }
1041            edges
1042        };
1043
1044        Ok(SubGraph {
1045            nodes: all_nodes,
1046            edges: all_edges,
1047        })
1048    }
1049
1050    // ── Indexing state ────────────────────────────────────────────────────────
1051
1052    fn last_indexed_sha(&self, branch_name: &str) -> Result<Option<String>> {
1053        branch::read_last_sha(&self.repo_id, branch_name)
1054    }
1055
1056    fn set_last_indexed_sha(&mut self, branch_name: &str, sha: &str) -> Result<()> {
1057        branch::write_last_sha(&self.repo_id, branch_name, sha)
1058    }
1059}