ripvec-core 1.0.3

Semantic code + document search engine. Cacheless static-embedding + cross-encoder rerank by default; optional ModernBERT/BGE transformer engines with GPU backends. Tree-sitter chunking, hybrid BM25 + PageRank, composable ranking layers.
Documentation
//! Static encoder: in-process `StaticEmbedModel` reimplementation.
//!
//! Port of `~/src/semble/src/semble/index/dense.py`. Wraps
//! [`StaticEmbedModel`] loaded with `minishlab/potion-code-16M`
//! (256-dim, L2-normalized). Implements [`VectorEncoder`] for the
//! `--model ripvec` path. CPU-only; no batching ring buffer.
//!
//! ## Why not `model2vec-rs`?
//!
//! The previous wave used the upstream `model2vec-rs` crate. Two real
//! problems pushed us to reimplement (see
//! `crates/ripvec-core/src/encoder/semble/static_model.rs` for the
//! full design rationale):
//!
//! 1. `model2vec_rs::StaticModel::encode_with_args` runs `pool_ids`
//!    in a serial inner loop while `tokenizers::encode_batch_fast`
//!    spawns its own rayon pool. Wrapping that path in our outer
//!    `par_chunks` produced 60% `__psynch_cvwait` in the linux-corpus
//!    profile — nested rayon scopes parking on each other. The
//!    reimplementation does ONE big tokenize plus a `par_iter` over
//!    `pool_ids` — no nested rayon, no parking.
//! 2. `model2vec-rs 0.2` pinned `ndarray 0.15`; ripvec-core uses
//!    `ndarray 0.17`. The two `Array2<f32>` types were not
//!    interchangeable, forcing a `Vec<Vec<f32>>` shim. Owning the
//!    load path eliminates the mismatch.

use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crossbeam_channel::bounded;
use hf_hub::api::sync::Api;
use rayon::prelude::*;

use crate::chunk::CodeChunk;
use crate::embed::SearchConfig;
use crate::encoder::VectorEncoder;
use crate::encoder::ripvec::chunking::{DEFAULT_DESIRED_CHUNK_CHARS, chunk_source};
use crate::encoder::ripvec::static_model::StaticEmbedModel;
use crate::languages::config_for_extension;
use crate::profile::Profiler;
use crate::walk::collect_files_with_options;

/// Encode batch size used by the streaming pipeline. Matches
/// `StaticEmbedModel`'s internal `BATCH_SIZE` so each emitted batch
/// is exactly one `encode_batch_fast` call's worth of work.
const PIPELINE_BATCH_SIZE: usize = 1024;

/// Number of full batches allowed in-flight from chunker to encoder.
/// Provides enough pipeline depth for the encoder to stay busy while
/// the chunker fills the next batch; small enough that peak memory
/// stays bounded.
const PIPELINE_RING_SIZE: usize = 4;

/// Default model repo identifier for the ripvec path. This is the HF
/// repo string used as `identity()`; the loader reads files from a
/// local path passed via `--model-repo`.
pub const DEFAULT_MODEL_REPO: &str = "minishlab/potion-code-16M";

/// Default hidden dimension for [`DEFAULT_MODEL_REPO`].
pub const DEFAULT_HIDDEN_DIM: usize = 256;

/// Maximum source file size to read, in bytes (mirrors semble's
/// `_MAX_FILE_BYTES = 1_000_000` from `index/create.py:16`).
const MAX_FILE_BYTES: u64 = 1_000_000;

/// CPU-only static encoder.
///
/// Owns a loaded [`StaticEmbedModel`] plus identity metadata. The
/// embedder is constructed by `main.rs::load_pipeline` via
/// [`StaticEncoder::from_pretrained`], passing either a local path
/// containing the Model2Vec files or (planned) an HF repo ID.
pub struct StaticEncoder {
    model: StaticEmbedModel,
    model_repo: String,
    hidden_dim: usize,
}

impl StaticEncoder {
    /// Encode a query string into a single embedding row.
    ///
    /// Used by `RipvecIndex::search` for hybrid/semantic dispatch.
    #[must_use]
    pub fn encode_query(&self, query: &str) -> Vec<f32> {
        self.model.encode_query(query)
    }

    /// Load a model by HuggingFace repo ID or local path.
    ///
    /// Two acceptance shapes:
    ///
    /// 1. **Local path** — if `model_repo` names an existing directory,
    ///    load directly from it. Used by the parity test fixture path
    ///    (`/tmp/potion-base-32M`) and any user pre-staging files.
    /// 2. **HuggingFace repo ID** — otherwise treat as `org/repo`,
    ///    download `config.json` / `tokenizer.json` / `model.safetensors`
    ///    via `hf-hub` into `~/.cache/huggingface/hub/`, and load from
    ///    there. Matches `load_classic_cpu` / `load_modernbert_cpu`'s
    ///    behaviour so the user-facing API is consistent: bare `--model
    ///    ripvec` with no `--model-repo` flag works.
    ///
    /// # Errors
    ///
    /// Propagates the underlying I/O, download, or parse error if the
    /// files cannot be obtained or the safetensors layout is
    /// unrecognized.
    pub fn from_pretrained(model_repo: &str) -> crate::Result<Self> {
        let resolved = Self::resolve_model_dir(model_repo)?;
        let model = StaticEmbedModel::from_path(&resolved, Some(true))
            .map_err(|e| crate::Error::Other(anyhow::anyhow!("static model load failed: {e}")))?;
        let hidden_dim = model.hidden_dim();
        Ok(Self {
            model,
            model_repo: model_repo.to_string(),
            hidden_dim,
        })
    }

    /// Resolve `model_repo` to a directory containing the model files.
    ///
    /// If `model_repo` is an existing local directory, returns it as-is.
    /// Otherwise downloads via `hf-hub` and returns the cache directory.
    fn resolve_model_dir(model_repo: &str) -> crate::Result<PathBuf> {
        let local = Path::new(model_repo);
        if local.is_dir() {
            return Ok(local.to_path_buf());
        }

        // HuggingFace repo path. Download the three required files and
        // return the directory `hf-hub` cached them into. All files
        // land in the same snapshot directory.
        let api = Api::new().map_err(|e| crate::Error::Download(e.to_string()))?;
        let repo = api.model(model_repo.to_string());
        let _ = repo
            .get("config.json")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        let _ = repo
            .get("tokenizer.json")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        let weights_path = repo
            .get("model.safetensors")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        // hf-hub returns the file path; the snapshot directory is its parent.
        weights_path
            .parent()
            .map(std::path::Path::to_path_buf)
            .ok_or_else(|| {
                crate::Error::Other(anyhow::anyhow!(
                    "hf-hub returned root path for {model_repo}; cannot resolve snapshot dir"
                ))
            })
    }
}

impl VectorEncoder for StaticEncoder {
    /// Three-stage bounded-queue pipeline:
    ///
    /// 1. **Chunk producer** — rayon `par_iter` over the file list. Each
    ///    file is read, parsed by tree-sitter (or line-merged on
    ///    fallback), and emitted as `(CodeChunk, String)` pairs into a
    ///    bounded channel of capacity `PIPELINE_BATCH_SIZE * 8`.
    /// 2. **Batch accumulator** — a single scoped thread drains the
    ///    chunk channel, packs `PIPELINE_BATCH_SIZE` pairs per batch,
    ///    and forwards into a bounded channel of capacity
    ///    `PIPELINE_RING_SIZE`.
    /// 3. **Encode worker** — a single scoped thread receives batches
    ///    and calls `StaticEmbedModel::encode_batch`, whose internal
    ///    `par_iter` lights up rayon for the pool_ids kernel.
    ///
    /// Why this shape:
    ///
    /// - The previous "chunk all, then embed all" implementation held
    ///   the entire `Vec<String>` of chunk contents in memory between
    ///   phases. On the linux corpus that was ~400 MB peak. The
    ///   bounded queues cap in-flight memory at
    ///   `PIPELINE_BATCH_SIZE * 8 + PIPELINE_RING_SIZE * PIPELINE_BATCH_SIZE`
    ///   chunks regardless of corpus size — under 15 MB.
    /// - The chunk phase (13s on linux) is hidden inside the embed
    ///   phase (70s) instead of serializing before it. Pre-pipeline
    ///   profile showed user-time at 394s on 82s wall = 4.8x
    ///   parallelism on 12 cores; pipeline lets idle cores chew on
    ///   chunking while embed runs.
    /// - Mirrors `embed::embed_all_streaming`'s shape so the two
    ///   pipelines (BERT + semble) share architectural conventions.
    fn embed_root(
        &self,
        root: &Path,
        cfg: &SearchConfig,
        profiler: &Profiler,
    ) -> crate::Result<(Vec<CodeChunk>, Vec<Vec<f32>>)> {
        // Phase 1: walk (still serial-to-pipeline because we need the
        // full file list to par_iter over; the walk itself is rayon).
        let walk_options = cfg.walk_options();
        let file_paths = {
            let _guard = profiler.phase("walk");
            collect_files_with_options(root, &walk_options)
        };
        if file_paths.is_empty() {
            return Ok((Vec::new(), Vec::new()));
        }

        // Bounded channels. See module constants for the rationale on
        // PIPELINE_BATCH_SIZE and PIPELINE_RING_SIZE.
        let (chunk_tx, chunk_rx) = bounded::<(CodeChunk, String)>(PIPELINE_BATCH_SIZE * 8);
        let (batch_tx, batch_rx) = bounded::<Vec<(CodeChunk, String)>>(PIPELINE_RING_SIZE);

        // The encoder stage writes ordered output behind a Mutex. Order
        // across files isn't meaningful (RipvecIndex doesn't rely on
        // chunk order), only the chunk[i] <-> embedding[i] pairing
        // matters — which we preserve trivially by pushing in lockstep.
        let output: Mutex<Vec<(CodeChunk, Vec<f32>)>> = Mutex::new(Vec::new());
        let model = &self.model;

        // Stage 1 runs on a DEDICATED rayon thread pool. If we used
        // the global pool, Stage 1's par_iter workers would park on
        // full `chunk_tx.send()` calls, and Stage 3's
        // `encode_batch` → `pool_ids` par_iter would have no rayon
        // workers available (they're all parked). That's a classic
        // nested-rayon deadlock — observed in profiling as PID stuck
        // at 0% CPU with 16 parked threads.
        //
        // Half the cores for chunking, half remain in the global pool
        // for the encode worker's pool_ids. The chunk phase (tree-
        // sitter + I/O bound) doesn't need full parallelism to
        // pipeline cleanly behind embed.
        let num_cores = rayon::current_num_threads().max(2);
        let chunk_threads = (num_cores / 2).max(1);
        let chunk_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(chunk_threads)
            .thread_name(|i| format!("semble-chunk-{i}"))
            .build()
            .map_err(|e| crate::Error::Other(anyhow::anyhow!("chunk thread pool build: {e}")))?;

        let _phase_guard = profiler.phase("pipeline");
        std::thread::scope(|scope| {
            // Stage 1: chunk producer on the dedicated pool.
            let chunk_tx_owned = chunk_tx;
            scope.spawn(move || {
                chunk_pool.install(|| {
                    file_paths.par_iter().for_each(|full| {
                        let (chunks, contents) = chunk_one_file(root, full);
                        for (chunk, content) in chunks.into_iter().zip(contents) {
                            if chunk_tx_owned.send((chunk, content)).is_err() {
                                return;
                            }
                        }
                    });
                });
                // chunk_tx_owned drops here, closing the channel.
            });

            // Stage 2: batch accumulator.
            let batch_tx_owned = batch_tx;
            scope.spawn(move || {
                let mut buf: Vec<(CodeChunk, String)> = Vec::with_capacity(PIPELINE_BATCH_SIZE);
                for pair in chunk_rx {
                    buf.push(pair);
                    if buf.len() >= PIPELINE_BATCH_SIZE {
                        let batch =
                            std::mem::replace(&mut buf, Vec::with_capacity(PIPELINE_BATCH_SIZE));
                        if batch_tx_owned.send(batch).is_err() {
                            return;
                        }
                    }
                }
                if !buf.is_empty() {
                    let _ = batch_tx_owned.send(buf);
                }
                // batch_tx_owned drops here, closing the channel.
            });

            // Stage 3: encode worker.
            scope.spawn(|| {
                for batch in batch_rx {
                    if batch.is_empty() {
                        continue;
                    }
                    let mut chunks = Vec::with_capacity(batch.len());
                    let mut texts: Vec<String> = Vec::with_capacity(batch.len());
                    for (chunk, text) in batch {
                        chunks.push(chunk);
                        texts.push(text);
                    }
                    let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
                    let embeddings = model.encode_batch(&text_refs);
                    debug_assert_eq!(embeddings.len(), chunks.len());
                    let mut out = output.lock().expect("output mutex poisoned");
                    for (chunk, emb) in chunks.into_iter().zip(embeddings) {
                        out.push((chunk, emb));
                    }
                }
            });
        });

        let collected = output.into_inner().expect("output mutex poisoned");
        let mut chunks_out = Vec::with_capacity(collected.len());
        let mut embs_out = Vec::with_capacity(collected.len());
        for (chunk, emb) in collected {
            chunks_out.push(chunk);
            embs_out.push(emb);
        }
        Ok((chunks_out, embs_out))
    }

    fn hidden_dim(&self) -> usize {
        self.hidden_dim
    }

    fn identity(&self) -> &str {
        &self.model_repo
    }
}

/// Chunk one file. Returns `(file_chunks, file_contents)` — empty
/// when the file is too large, can't be read, or has no chunks.
fn chunk_one_file(root: &Path, full: &Path) -> (Vec<CodeChunk>, Vec<String>) {
    match std::fs::metadata(full) {
        Ok(meta) if meta.len() > MAX_FILE_BYTES => return (Vec::new(), Vec::new()),
        Err(_) => return (Vec::new(), Vec::new()),
        _ => {}
    }
    let Ok(source) = std::fs::read_to_string(full) else {
        return (Vec::new(), Vec::new());
    };

    let ext = full
        .extension()
        .and_then(|e| e.to_str())
        .unwrap_or_default();
    let lang_cfg = config_for_extension(ext);
    let language = lang_cfg.as_ref().map(|c| &c.language);

    let rel_path = full
        .strip_prefix(root)
        .unwrap_or(full)
        .display()
        .to_string();

    let boundaries = chunk_source(&source, language, DEFAULT_DESIRED_CHUNK_CHARS);
    let mut chunks = Vec::with_capacity(boundaries.len());
    let mut contents = Vec::with_capacity(boundaries.len());
    for b in boundaries {
        let text = b.content(&source).to_string();
        if text.trim().is_empty() {
            continue;
        }
        contents.push(text.clone());
        chunks.push(CodeChunk {
            file_path: rel_path.clone(),
            name: String::new(),
            kind: String::new(),
            start_line: b.start_line,
            end_line: b.end_line,
            content: text.clone(),
            enriched_content: text,
        });
    }
    (chunks, contents)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::encoder::VectorEncoder;

    /// `StaticEncoder` implements `VectorEncoder` + Send + Sync.
    /// Compile-time check (`test:static-encoder-implements-vector-encoder`).
    #[test]
    fn static_encoder_implements_vector_encoder() {
        fn assert_trait_object<T: VectorEncoder + Send + Sync>() {}
        assert_trait_object::<StaticEncoder>();
    }

    /// `from_pretrained` returns the right hidden_dim from a probe encode.
    /// Ignored by default because it requires a model download (~16 MB).
    ///
    /// Corresponds to acceptance `test:static-encoder-hidden-dim-256` and
    /// `test:static-encoder-loads-potion-code-16m` and
    /// `test:static-encoder-output-is-l2-normalized`.
    #[test]
    #[ignore = "requires local model files at RIPVEC_SEMBLE_MODEL_PATH"]
    fn static_encoder_loads_potion_code_16m() {
        let Ok(path) = std::env::var("RIPVEC_SEMBLE_MODEL_PATH") else {
            eprintln!("RIPVEC_SEMBLE_MODEL_PATH not set; skipping");
            return;
        };
        let enc = StaticEncoder::from_pretrained(&path).expect("model load should succeed");
        assert_eq!(enc.hidden_dim(), DEFAULT_HIDDEN_DIM);
        // identity() reflects what the caller passed (typically the
        // local path under test).
        assert_eq!(enc.identity(), path);

        // Verify L2-normalized output via the public encode_query path.
        let row = enc.encode_query("hello world");
        let norm: f32 = row.iter().map(|x| x * x).sum::<f32>().sqrt();
        assert!(
            (norm - 1.0).abs() < 1e-3,
            "expected L2-normalized output; got norm={norm}"
        );
    }
}