ripvec-core 2.0.0

Semantic code + document search engine. Cacheless static-embedding + cross-encoder rerank by default; optional ModernBERT/BGE transformer engines with GPU backends. Tree-sitter chunking, hybrid BM25 + PageRank, composable ranking layers.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! Static encoder: in-process `StaticEmbedModel` reimplementation.
//!
//! Port of `~/src/semble/src/semble/index/dense.py`. Wraps
//! [`StaticEmbedModel`] loaded with `minishlab/potion-base-32M`
//! (256-dim, L2-normalized). Implements [`VectorEncoder`] for the
//! `--model ripvec` path. CPU-only; no batching ring buffer.
//!
//! Default was bumped to `potion-base-32M` in v1.3.0 after the
//! gutenberg + python-repos matrix showed 32M winning prose by
//! 0.058 NDCG@10 while losing code by only 0.004 — a clear
//! single-default win once the i64 mapping bug and the reranker
//! pooler / sigmoid / truncation bugs were fixed. The code-tuned
//! `potion-code-16M` is still available via `--model-repo`.
//!
//! ## Why not `model2vec-rs`?
//!
//! The previous wave used the upstream `model2vec-rs` crate. Two real
//! problems pushed us to reimplement (see
//! `crates/ripvec-core/src/encoder/semble/static_model.rs` for the
//! full design rationale):
//!
//! 1. `model2vec_rs::StaticModel::encode_with_args` runs `pool_ids`
//!    in a serial inner loop while `tokenizers::encode_batch_fast`
//!    spawns its own rayon pool. Wrapping that path in our outer
//!    `par_chunks` produced 60% `__psynch_cvwait` in the linux-corpus
//!    profile — nested rayon scopes parking on each other. The
//!    reimplementation does ONE big tokenize plus a `par_iter` over
//!    `pool_ids` — no nested rayon, no parking.
//! 2. `model2vec-rs 0.2` pinned `ndarray 0.15`; ripvec-core uses
//!    `ndarray 0.17`. The two `Array2<f32>` types were not
//!    interchangeable, forcing a `Vec<Vec<f32>>` shim. Owning the
//!    load path eliminates the mismatch.

use std::path::{Path, PathBuf};
use std::sync::Mutex;

use crossbeam_channel::bounded;
use hf_hub::api::sync::Api;
use rayon::prelude::*;

use crate::chunk::CodeChunk;
use crate::embed::SearchConfig;
use crate::encoder::VectorEncoder;
use crate::encoder::ripvec::chunking::{DEFAULT_DESIRED_CHUNK_CHARS, chunk_source};
use crate::encoder::ripvec::static_model::StaticEmbedModel;
use crate::languages::config_for_extension;
use crate::profile::Profiler;
use crate::walk::collect_files_with_options;

/// Encode batch size used by the streaming pipeline. Matches
/// `StaticEmbedModel`'s internal `BATCH_SIZE` so each emitted batch
/// is exactly one `encode_batch_fast` call's worth of work.
const PIPELINE_BATCH_SIZE: usize = 1024;

/// Number of full batches allowed in-flight from chunker to encoder.
/// Provides enough pipeline depth for the encoder to stay busy while
/// the chunker fills the next batch; small enough that peak memory
/// stays bounded.
const PIPELINE_RING_SIZE: usize = 4;

/// Default model repo identifier for the ripvec path. This is the HF
/// repo string used as `identity()`; the loader reads files from a
/// local path passed via `--model-repo`.
pub const DEFAULT_MODEL_REPO: &str = "minishlab/potion-base-32M";

/// Default hidden dimension for [`DEFAULT_MODEL_REPO`].
pub const DEFAULT_HIDDEN_DIM: usize = 256;

/// Maximum source file size to read, in bytes (mirrors semble's
/// `_MAX_FILE_BYTES = 1_000_000` from `index/create.py:16`).
const MAX_FILE_BYTES: u64 = 1_000_000;

/// CPU-only static encoder.
///
/// Owns a loaded [`StaticEmbedModel`] plus identity metadata. The
/// embedder is constructed by `main.rs::load_pipeline` via
/// [`StaticEncoder::from_pretrained`], passing either a local path
/// containing the Model2Vec files or (planned) an HF repo ID.
pub struct StaticEncoder {
    model: StaticEmbedModel,
    model_repo: String,
    hidden_dim: usize,
}

impl StaticEncoder {
    /// Encode a query string into a single embedding row.
    ///
    /// Used by `RipvecIndex::search` for hybrid/semantic dispatch.
    #[must_use]
    pub fn encode_query(&self, query: &str) -> Vec<f32> {
        self.model.encode_query(query)
    }

    /// Load a model by HuggingFace repo ID or local path.
    ///
    /// Two acceptance shapes:
    ///
    /// 1. **Local path** — if `model_repo` names an existing directory,
    ///    load directly from it. Used by the parity test fixture path
    ///    (`/tmp/potion-base-32M`) and any user pre-staging files.
    /// 2. **HuggingFace repo ID** — otherwise treat as `org/repo`,
    ///    download `config.json` / `tokenizer.json` / `model.safetensors`
    ///    via `hf-hub` into `~/.cache/huggingface/hub/`, and load from
    ///    there. Matches `load_classic_cpu` / `load_modernbert_cpu`'s
    ///    behaviour so the user-facing API is consistent: bare `--model
    ///    ripvec` with no `--model-repo` flag works.
    ///
    /// # Errors
    ///
    /// Propagates the underlying I/O, download, or parse error if the
    /// files cannot be obtained or the safetensors layout is
    /// unrecognized.
    pub fn from_pretrained(model_repo: &str) -> crate::Result<Self> {
        let resolved = Self::resolve_model_dir(model_repo)?;
        let model = StaticEmbedModel::from_path(&resolved, Some(true))
            .map_err(|e| crate::Error::Other(anyhow::anyhow!("static model load failed: {e}")))?;
        let hidden_dim = model.hidden_dim();
        Ok(Self {
            model,
            model_repo: model_repo.to_string(),
            hidden_dim,
        })
    }

    /// Resolve `model_repo` to a directory containing the model files.
    ///
    /// If `model_repo` is an existing local directory, returns it as-is.
    /// Otherwise downloads via `hf-hub` and returns the cache directory.
    fn resolve_model_dir(model_repo: &str) -> crate::Result<PathBuf> {
        let local = Path::new(model_repo);
        if local.is_dir() {
            return Ok(local.to_path_buf());
        }

        // HuggingFace repo path. Download the three required files and
        // return the directory `hf-hub` cached them into. All files
        // land in the same snapshot directory.
        let api = Api::new().map_err(|e| crate::Error::Download(e.to_string()))?;
        let repo = api.model(model_repo.to_string());
        let _ = repo
            .get("config.json")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        let _ = repo
            .get("tokenizer.json")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        let weights_path = repo
            .get("model.safetensors")
            .map_err(|e| crate::Error::Download(e.to_string()))?;
        // hf-hub returns the file path; the snapshot directory is its parent.
        weights_path
            .parent()
            .map(std::path::Path::to_path_buf)
            .ok_or_else(|| {
                crate::Error::Other(anyhow::anyhow!(
                    "hf-hub returned root path for {model_repo}; cannot resolve snapshot dir"
                ))
            })
    }
}

impl VectorEncoder for StaticEncoder {
    /// Three-stage bounded-queue pipeline:
    ///
    /// 1. **Chunk producer** — rayon `par_iter` over the file list. Each
    ///    file is read, parsed by tree-sitter (or line-merged on
    ///    fallback), and emitted as `(CodeChunk, String)` pairs into a
    ///    bounded channel of capacity `PIPELINE_BATCH_SIZE * 8`.
    /// 2. **Batch accumulator** — a single scoped thread drains the
    ///    chunk channel, packs `PIPELINE_BATCH_SIZE` pairs per batch,
    ///    and forwards into a bounded channel of capacity
    ///    `PIPELINE_RING_SIZE`.
    /// 3. **Encode worker** — a single scoped thread receives batches
    ///    and calls `StaticEmbedModel::encode_batch`, whose internal
    ///    `par_iter` lights up rayon for the pool_ids kernel.
    ///
    /// Why this shape:
    ///
    /// - The previous "chunk all, then embed all" implementation held
    ///   the entire `Vec<String>` of chunk contents in memory between
    ///   phases. On the linux corpus that was ~400 MB peak. The
    ///   bounded queues cap in-flight memory at
    ///   `PIPELINE_BATCH_SIZE * 8 + PIPELINE_RING_SIZE * PIPELINE_BATCH_SIZE`
    ///   chunks regardless of corpus size — under 15 MB.
    /// - The chunk phase (13s on linux) is hidden inside the embed
    ///   phase (70s) instead of serializing before it. Pre-pipeline
    ///   profile showed user-time at 394s on 82s wall = 4.8x
    ///   parallelism on 12 cores; pipeline lets idle cores chew on
    ///   chunking while embed runs.
    /// - Mirrors `embed::embed_all_streaming`'s shape so the two
    ///   pipelines (BERT + semble) share architectural conventions.
    fn embed_root(
        &self,
        root: &Path,
        cfg: &SearchConfig,
        profiler: &Profiler,
    ) -> crate::Result<(Vec<CodeChunk>, Vec<Vec<f32>>)> {
        // Phase 1: walk (still serial-to-pipeline because we need the
        // full file list to par_iter over; the walk itself is rayon).
        let walk_options = cfg.walk_options();
        let file_paths = {
            let _guard = profiler.phase("walk");
            collect_files_with_options(root, &walk_options)
        };
        if file_paths.is_empty() {
            return Ok((Vec::new(), Vec::new()));
        }

        // Bounded channels. See module constants for the rationale on
        // PIPELINE_BATCH_SIZE and PIPELINE_RING_SIZE.
        let (chunk_tx, chunk_rx) = bounded::<(CodeChunk, String)>(PIPELINE_BATCH_SIZE * 8);
        let (batch_tx, batch_rx) = bounded::<Vec<(CodeChunk, String)>>(PIPELINE_RING_SIZE);

        // The encoder stage writes ordered output behind a Mutex. Order
        // across files isn't meaningful (RipvecIndex doesn't rely on
        // chunk order), only the chunk[i] <-> embedding[i] pairing
        // matters — which we preserve trivially by pushing in lockstep.
        let output: Mutex<Vec<(CodeChunk, Vec<f32>)>> = Mutex::new(Vec::new());
        let model = &self.model;

        // Stage 1 runs on a DEDICATED rayon thread pool. If we used
        // the global pool, Stage 1's par_iter workers would park on
        // full `chunk_tx.send()` calls, and Stage 3's
        // `encode_batch` → `pool_ids` par_iter would have no rayon
        // workers available (they're all parked). That's a classic
        // nested-rayon deadlock — observed in profiling as PID stuck
        // at 0% CPU with 16 parked threads.
        //
        // Half the cores for chunking, half remain in the global pool
        // for the encode worker's pool_ids. The chunk phase (tree-
        // sitter + I/O bound) doesn't need full parallelism to
        // pipeline cleanly behind embed.
        let num_cores = rayon::current_num_threads().max(2);
        let chunk_threads = (num_cores / 2).max(1);
        let chunk_pool = rayon::ThreadPoolBuilder::new()
            .num_threads(chunk_threads)
            .thread_name(|i| format!("semble-chunk-{i}"))
            .build()
            .map_err(|e| crate::Error::Other(anyhow::anyhow!("chunk thread pool build: {e}")))?;

        let _phase_guard = profiler.phase("pipeline");
        std::thread::scope(|scope| {
            // Stage 1: chunk producer on the dedicated pool.
            let chunk_tx_owned = chunk_tx;
            scope.spawn(move || {
                chunk_pool.install(|| {
                    file_paths.par_iter().for_each(|full| {
                        let (chunks, contents) = chunk_one_file(root, full);
                        for (chunk, content) in chunks.into_iter().zip(contents) {
                            if chunk_tx_owned.send((chunk, content)).is_err() {
                                return;
                            }
                        }
                    });
                });
                // chunk_tx_owned drops here, closing the channel.
            });

            // Stage 2: batch accumulator.
            let batch_tx_owned = batch_tx;
            scope.spawn(move || {
                let mut buf: Vec<(CodeChunk, String)> = Vec::with_capacity(PIPELINE_BATCH_SIZE);
                for pair in chunk_rx {
                    buf.push(pair);
                    if buf.len() >= PIPELINE_BATCH_SIZE {
                        let batch =
                            std::mem::replace(&mut buf, Vec::with_capacity(PIPELINE_BATCH_SIZE));
                        if batch_tx_owned.send(batch).is_err() {
                            return;
                        }
                    }
                }
                if !buf.is_empty() {
                    let _ = batch_tx_owned.send(buf);
                }
                // batch_tx_owned drops here, closing the channel.
            });

            // Stage 3: encode worker.
            scope.spawn(|| {
                for batch in batch_rx {
                    if batch.is_empty() {
                        continue;
                    }
                    let mut chunks = Vec::with_capacity(batch.len());
                    let mut texts: Vec<String> = Vec::with_capacity(batch.len());
                    for (chunk, text) in batch {
                        chunks.push(chunk);
                        texts.push(text);
                    }
                    let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
                    let embeddings = model.encode_batch(&text_refs);
                    debug_assert_eq!(embeddings.len(), chunks.len());
                    let mut out = output.lock().expect("output mutex poisoned");
                    for (chunk, emb) in chunks.into_iter().zip(embeddings) {
                        out.push((chunk, emb));
                    }
                }
            });
        });

        let collected = output.into_inner().expect("output mutex poisoned");
        let mut chunks_out = Vec::with_capacity(collected.len());
        let mut embs_out = Vec::with_capacity(collected.len());
        for (chunk, emb) in collected {
            chunks_out.push(chunk);
            embs_out.push(emb);
        }
        Ok((chunks_out, embs_out))
    }

    fn hidden_dim(&self) -> usize {
        self.hidden_dim
    }

    fn identity(&self) -> &str {
        &self.model_repo
    }
}

/// Chunk one file. Returns `(file_chunks, file_contents)` — empty
/// when the file is too large, can't be read, or has no chunks.
fn chunk_one_file(root: &Path, full: &Path) -> (Vec<CodeChunk>, Vec<String>) {
    match std::fs::metadata(full) {
        Ok(meta) if meta.len() > MAX_FILE_BYTES => return (Vec::new(), Vec::new()),
        Err(_) => return (Vec::new(), Vec::new()),
        _ => {}
    }
    let Ok(source) = std::fs::read_to_string(full) else {
        return (Vec::new(), Vec::new());
    };

    let ext = full
        .extension()
        .and_then(|e| e.to_str())
        .unwrap_or_default();
    let lang_cfg = config_for_extension(ext);
    let language = lang_cfg.as_ref().map(|c| &c.language);

    let rel_path = full
        .strip_prefix(root)
        .unwrap_or(full)
        .display()
        .to_string();

    let boundaries = chunk_source(&source, language, DEFAULT_DESIRED_CHUNK_CHARS);
    let mut chunks = Vec::with_capacity(boundaries.len());
    let mut contents = Vec::with_capacity(boundaries.len());
    for b in boundaries {
        let text = b.content(&source).to_string();
        if text.trim().is_empty() {
            continue;
        }
        contents.push(text.clone());
        chunks.push(CodeChunk {
            file_path: rel_path.clone(),
            name: String::new(),
            kind: String::new(),
            start_line: b.start_line,
            end_line: b.end_line,
            content: text.clone(),
            enriched_content: text,
        });
    }
    (chunks, contents)
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::encoder::VectorEncoder;

    /// `StaticEncoder` implements `VectorEncoder` + Send + Sync.
    /// Compile-time check (`test:static-encoder-implements-vector-encoder`).
    #[test]
    fn static_encoder_implements_vector_encoder() {
        fn assert_trait_object<T: VectorEncoder + Send + Sync>() {}
        assert_trait_object::<StaticEncoder>();
    }

    /// `from_pretrained` returns the right hidden_dim from a probe encode.
    /// Ignored by default because it requires a model download (~16 MB).
    ///
    /// Corresponds to acceptance `test:static-encoder-hidden-dim-256` and
    /// `test:static-encoder-loads-potion-code-16m` and
    /// `test:static-encoder-output-is-l2-normalized`.
    #[test]
    #[ignore = "requires local model files at RIPVEC_SEMBLE_MODEL_PATH"]
    fn static_encoder_loads_potion_code_16m() {
        let Ok(path) = std::env::var("RIPVEC_SEMBLE_MODEL_PATH") else {
            eprintln!("RIPVEC_SEMBLE_MODEL_PATH not set; skipping");
            return;
        };
        let enc = StaticEncoder::from_pretrained(&path).expect("model load should succeed");
        assert_eq!(enc.hidden_dim(), DEFAULT_HIDDEN_DIM);
        // identity() reflects what the caller passed (typically the
        // local path under test).
        assert_eq!(enc.identity(), path);

        // Verify L2-normalized output via the public encode_query path.
        let row = enc.encode_query("hello world");
        let norm: f32 = row.iter().map(|x| x * x).sum::<f32>().sqrt();
        assert!(
            (norm - 1.0).abs() < 1e-3,
            "expected L2-normalized output; got norm={norm}"
        );
    }
}