ckg-storage 1.3.1

CozoDB-backed storage layer for ckg (per-repo + registry DBs).
Documentation
//! Bulk insert operations: `put_symbols` and `put_edges`.

use std::collections::BTreeMap;

use ckg_core::{Edge, Result, Symbol};
use cozo::{DataValue, ScriptMutability};

use crate::cozo_compat::CozoErrorKind;
use super::map_err;
use super::Storage;

/// Serialize a `Symbol` into a Cozo row (list of DataValues).
pub(super) fn symbol_to_row(s: &Symbol) -> DataValue {
    DataValue::List(vec![
        DataValue::from(s.id.as_str()),
        DataValue::from(s.qname.as_str()),
        DataValue::from(s.name.as_str()),
        DataValue::from(s.kind.as_str()),
        DataValue::from(s.file.as_str()),
        DataValue::from(s.line as i64),
        DataValue::from(s.col as i64),
        DataValue::Bool(s.is_public),
        DataValue::from(s.doc.as_str()),
        DataValue::from(s.hash.as_str()),
    ])
}

/// Serialize an `Edge` into a Cozo row, optionally including the confidence
/// column (relations with confidence: `Calls`, `Imports`, `Extends`,
/// `Implements`, `Awaits`).
pub(super) fn edge_to_row(e: &Edge, with_conf: bool) -> DataValue {
    if with_conf {
        DataValue::List(vec![
            DataValue::from(e.src.as_str()),
            DataValue::from(e.dst.as_str()),
            DataValue::from(e.confidence as f64),
        ])
    } else {
        DataValue::List(vec![
            DataValue::from(e.src.as_str()),
            DataValue::from(e.dst.as_str()),
        ])
    }
}

impl Storage {
    /// Bulk insert symbols using `:put` with a `$rows` parameter so user data
    /// is bound, not pasted into the script (avoids quote-escape pitfalls).
    pub fn put_symbols(&self, symbols: &[Symbol]) -> Result<()> {
        const SCRIPT: &str = "
?[id, qname, name, kind, file, line, col, is_public, doc, hash] <- $rows
:put Symbol {id => qname, name, kind, file, line, col, is_public, doc, hash}
";
        for chunk in symbols.chunks(1000) {
            let rows: Vec<DataValue> = chunk.iter().map(symbol_to_row).collect();
            let mut params = BTreeMap::new();
            params.insert("rows".into(), DataValue::List(rows));
            self.db
                .run_script(SCRIPT, params, ScriptMutability::Mutable)
                .map_err(map_err)?;
        }
        Ok(())
    }

    /// Index GC: delete every symbol whose `file` is NOT in `live_files`.
    /// Also reaps every inbound/outbound edge from those symbols.
    ///
    /// Use case: during `ckg index`, the indexer walks the working tree and
    /// produces a fresh symbol set for the files it sees. Files that were
    /// indexed previously but have since been renamed or deleted leave
    /// phantom Symbol rows that pollute `dead-code`, `orphan-calls`, and
    /// blast-radius results. Without this sweep, removing a source file
    /// silently leaves its symbols behind forever.
    ///
    /// `live_files` is the set of file paths the current index pass
    /// produced symbols for. Anything in the Symbol relation with a
    /// `file` not in this set is treated as phantom and dropped.
    ///
    /// Returns the count of deleted Symbol rows (edges are deleted as a
    /// side-effect via Cozo's referential rules — see comment below).
    pub fn gc_symbols_not_in(&self, live_files: &std::collections::HashSet<String>) -> Result<usize> {
        // 1. Find phantom symbol ids.
        let rows = self
            .db
            .run_script(
                "?[id, file] := *Symbol{id, file}",
                BTreeMap::new(),
                ScriptMutability::Immutable,
            )
            .map_err(map_err)?;
        let phantom_ids: Vec<DataValue> = rows
            .rows
            .into_iter()
            .filter_map(|r| {
                let id = match r.first() {
                    Some(DataValue::Str(s)) => s.to_string(),
                    _ => return None,
                };
                let file = match r.get(1) {
                    Some(DataValue::Str(s)) => s.to_string(),
                    _ => return None,
                };
                if live_files.contains(&file) {
                    None
                } else {
                    Some(DataValue::from(id.as_str()))
                }
            })
            .collect();
        if phantom_ids.is_empty() {
            return Ok(0);
        }
        let count = phantom_ids.len();
        // 2. Delete from Symbol and every edge relation that references
        //    these ids by `src` or `dst`. Cozo doesn't cascade, so we
        //    walk each relation explicitly.
        const EDGE_RELS: &[&str] = &[
            "Calls", "Imports", "Extends", "Implements", "Defines", "Documents", "Tests", "Awaits",
        ];
        for chunk in phantom_ids.chunks(1000) {
            let mut params = BTreeMap::new();
            params.insert("ids".into(), DataValue::List(chunk.to_vec()));
            // Symbol :rm by id.
            self.db
                .run_script(
                    "?[id] := id in $ids\n:rm Symbol {id}",
                    params.clone(),
                    ScriptMutability::Mutable,
                )
                .map_err(map_err)?;
            // Edges: remove every row where src or dst matches. We do
            // src and dst as two passes to keep the Datalog simple.
            for rel in EDGE_RELS {
                let script_src = format!(
                    "?[src, dst] := *{rel}{{src, dst}}, src in $ids\n:rm {rel} {{src, dst}}\n"
                );
                let script_dst = format!(
                    "?[src, dst] := *{rel}{{src, dst}}, dst in $ids\n:rm {rel} {{src, dst}}\n"
                );
                // Some relations have a confidence column — the :rm
                // matches on the primary key so the schema variant
                // doesn't matter. RelationMissing is swallowed
                // (idempotent recovery from old schemas that lack this
                // relation). All other error classes (I/O, corruption,
                // lock contention) are propagated so they don't silently
                // leave dangling edges in the graph.
                for script in [&script_src, &script_dst] {
                    if let Err(e) = self
                        .db
                        .run_script(script, params.clone(), ScriptMutability::Mutable)
                    {
                        let mapped = map_err(e);
                        if !matches!(CozoErrorKind::of(&mapped), CozoErrorKind::RelationMissing) {
                            return Err(mapped);
                        }
                    }
                }
            }
        }
        tracing::info!(deleted = count, "index GC: reaped phantom symbols");
        Ok(count)
    }

    /// Bulk insert edges. Routes by `EdgeKind` to the right relation.
    ///
    /// `by_rel` groups edges by their Cozo relation name (`&'static str` from
    /// `EdgeKind::as_relation()`). An enum-keyed array would be marginally
    /// faster but `EdgeKind` variants are few and the BTreeMap overhead is
    /// negligible — see L5 tracking comment. Deferring until `EdgeKind` gains
    /// a stable `#[repr(u8)]` discriminant.
    pub fn put_edges(&self, edges: &[Edge]) -> Result<()> {
        let mut by_rel: BTreeMap<&'static str, Vec<&Edge>> = BTreeMap::new();
        for e in edges {
            by_rel.entry(e.kind.as_relation()).or_default().push(e);
        }
        for (rel, list) in by_rel {
            let with_conf = matches!(
                rel,
                "Calls" | "Imports" | "Extends" | "Implements" | "Awaits"
            );
            let script = if with_conf {
                format!("?[src, dst, confidence] <- $rows\n:put {rel} {{src, dst => confidence}}\n")
            } else {
                format!("?[src, dst] <- $rows\n:put {rel} {{src, dst}}\n")
            };
            for chunk in list.chunks(1000) {
                let rows: Vec<DataValue> =
                    chunk.iter().map(|e| edge_to_row(e, with_conf)).collect();
                let mut params = BTreeMap::new();
                params.insert("rows".into(), DataValue::List(rows));
                self.db
                    .run_script(&script, params, ScriptMutability::Mutable)
                    .map_err(map_err)?;
            }
        }
        Ok(())
    }
}