ckg-storage 1.1.2

CozoDB-backed storage layer for ckg (per-repo + registry DBs).
Documentation
//! Embedding storage and HNSW nearest-neighbour search.
//!
//! `put_embeddings`: bulk-insert `<F32; 768>` vectors; creates the HNSW index
//! idempotently after the first batch.
//!
//! `hnsw_search`: k-NN query returning `(id, similarity)` pairs in descending
//! similarity order.
//!
//! `iter_embeddings_capped`: brute-force fallback for DBs without an HNSW index.

use std::collections::BTreeMap;

use ckg_core::Result;
use cozo::{DataValue, ScriptMutability};

use super::lifecycle::run_idempotent;
use super::map_err;
use super::Storage;

/// Embedding vector dimensionality. Must match `ckg_embed::EMBED_DIM` and the
/// `dim: 768` literal in the HNSW DDL — see `put_embeddings` for the
/// idempotent index creation.
pub const STORAGE_EMBED_DIM: usize = 768;

/// M5: Maximum `k` accepted by `hnsw_search`. Caps memory and Cozo
/// `:limit` to prevent pathological results. Public so callers can
/// clamp their `k` before calling and produce a user-visible message.
pub const HNSW_K_MAX: usize = 10_000;

impl Storage {
    /// Bulk insert symbol embeddings into the typed `<F32; 768>` column.
    ///
    /// After the first batch the HNSW index is created (`::hnsw create` errors
    /// on an empty relation, so creation is deferred until rows exist; the call
    /// is idempotent for subsequent batches — "already exists" errors are
    /// swallowed by `run_idempotent`).
    /// STORAGE-H5: takes `items` by value so we can move the `Vec<f32>`
    /// into `Array1::from_vec` without cloning 768 floats per row. On a
    /// 1M-symbol monorepo this halves transient memory (~3 GB → ~1.5 GB).
    pub fn put_embeddings(&self, items: Vec<(String, Vec<f32>)>) -> Result<()> {
        if items.is_empty() {
            return Ok(());
        }
        // CR-I-1: defense-in-depth dim check at the storage boundary. The typed
        // `<F32; 768>` column rejects bad-dim rows but the error is opaque
        // ("invalid vector"). Surface the bad row index here so a future caller
        // bypassing `ckg_embed::Embedder` gets an actionable failure.
        for (i, (id, v)) in items.iter().enumerate() {
            if v.len() != STORAGE_EMBED_DIM {
                return Err(map_err(format!(
                    "put_embeddings row {i} (id={id}) has dim {} (expected {STORAGE_EMBED_DIM})",
                    v.len()
                )));
            }
        }
        const SCRIPT: &str = "?[id, vec] <- $rows :put Embedding {id => vec}";
        for chunk in items.chunks(500) {
            let rows: Vec<DataValue> = chunk
                .iter()
                .map(|(id, v)| {
                    DataValue::List(vec![
                        DataValue::from(id.as_str()),
                        DataValue::Vec(cozo::Vector::F32(ndarray::Array1::from_vec(v.clone()))),
                    ])
                })
                .collect();
            let mut params = BTreeMap::new();
            params.insert("rows".into(), DataValue::List(rows));
            self.db
                .run_script(SCRIPT, params, ScriptMutability::Mutable)
                .map_err(map_err)?;
        }
        // Create the HNSW index on first insert; idempotent via run_idempotent.
        run_idempotent(
            &self.db,
            "::hnsw create Embedding:embed_idx { \
             fields: [vec], dim: 768, m: 16, ef_construction: 200, distance: Cosine \
             }",
        )?;
        Ok(())
    }

    /// HNSW nearest-neighbor query: returns up to `k` `(id, similarity)` pairs
    /// in descending order of similarity (1.0 = identical). The HNSW index must
    /// exist (created automatically on first `put_embeddings`).
    pub fn hnsw_search(&self, query: &[f32], k: usize) -> Result<Vec<(String, f32)>> {
        if query.is_empty() {
            return Ok(Vec::new());
        }
        // CR-I-1: dim check before passing to Cozo (whose error doesn't name the dim).
        if query.len() != STORAGE_EMBED_DIM {
            return Err(map_err(format!(
                "hnsw_search query has dim {} (expected {STORAGE_EMBED_DIM})",
                query.len()
            )));
        }
        // CR-M-1 / M5: cap `k` so a caller passing `usize::MAX` can't OOM or
        // generate a pathological Cozo limit literal.
        let k = k.min(HNSW_K_MAX);
        let q = DataValue::Vec(cozo::Vector::F32(ndarray::Array1::from_vec(query.to_vec())));
        let mut params = BTreeMap::new();
        params.insert("q".into(), q);
        // `~` prefix invokes the HNSW search system rule; `dist` is cosine
        // distance ∈ [0, 2]; convert to similarity via 1 − dist/2 → [0, 1].
        let script = format!(
            "?[id, dist] := ~Embedding:embed_idx{{ id, dist | query: $q, k: {k}, ef: 200 }}\n\
             :order dist\n\
             :limit {k}"
        );
        let rows = self
            .db
            .run_script(&script, params, ScriptMutability::Immutable)
            .map_err(map_err)?;
        let mut out = Vec::with_capacity(rows.rows.len());
        for r in rows.rows {
            let id = match r.first() {
                Some(DataValue::Str(s)) => s.to_string(),
                _ => continue,
            };
            let dist = match r.get(1) {
                Some(DataValue::Num(cozo::Num::Float(f))) => *f as f32,
                Some(DataValue::Num(cozo::Num::Int(i))) => *i as f32,
                _ => 0.0,
            };
            // Clamp: floating-point error can yield d slightly > 2 → sim < 0.
            let sim = (1.0 - dist / 2.0).clamp(0.0, 1.0);
            out.push((id, sim));
        }
        Ok(out)
    }

    /// Brute-force fallback used when the HNSW index isn't available (e.g.
    /// older DBs). Loads up to `max_rows` embeddings into memory.
    ///
    /// **Always pass an explicit cap** — an unbounded scan on a large monorepo
    /// materializes the entire embedding table (≈3 GB for 1M symbols × 768-dim
    /// f32) in process memory before returning.
    ///
    /// Returns an error when `max_rows == 0` because a zero-row scan is always
    /// a caller bug (use `hnsw_search` for zero-result semantics). The cast to
    /// `i64` is saturating so values > `i64::MAX` degrade to the Cozo max
    /// rather than wrapping negative.
    pub fn iter_embeddings_capped(&self, max_rows: usize) -> Result<Vec<(String, Vec<f32>)>> {
        if max_rows == 0 {
            return Err(map_err(
                "iter_embeddings_capped: max_rows must be > 0 (use hnsw_search for k-NN queries)",
            ));
        }
        // M6: saturating cast — usize > i64::MAX is unreachable in practice
        // but mustn't wrap negative and become a Cozo error.
        let limit_i64 = max_rows.min(i64::MAX as usize) as i64;
        let mut params = BTreeMap::new();
        params.insert("limit".into(), DataValue::from(limit_i64));
        let rows = self
            .db
            .run_script(
                "?[id, vec] := *Embedding{id, vec} :limit $limit",
                params,
                ScriptMutability::Immutable,
            )
            .map_err(map_err)?;
        let mut out = Vec::with_capacity(rows.rows.len());
        for r in rows.rows {
            let mut it = r.into_iter();
            let id = match it.next() {
                Some(DataValue::Str(s)) => s.to_string(),
                _ => continue,
            };
            let vec = match it.next() {
                Some(DataValue::Vec(cozo::Vector::F32(arr))) => arr.to_vec(),
                Some(DataValue::Vec(cozo::Vector::F64(arr))) => {
                    arr.iter().map(|f| *f as f32).collect()
                }
                _ => continue,
            };
            out.push((id, vec));
        }
        Ok(out)
    }
}