ripvec_core/encoder/ripvec/dense.rs
1//! Static encoder: in-process `StaticEmbedModel` reimplementation.
2//!
3//! Port of `~/src/semble/src/semble/index/dense.py`. Wraps
4//! [`StaticEmbedModel`] loaded with `minishlab/potion-code-16M`
5//! (256-dim, L2-normalized). Implements [`VectorEncoder`] for the
6//! `--model ripvec` path. CPU-only; no batching ring buffer.
7//!
8//! ## Why not `model2vec-rs`?
9//!
10//! The previous wave used the upstream `model2vec-rs` crate. Two real
11//! problems pushed us to reimplement (see
12//! `crates/ripvec-core/src/encoder/semble/static_model.rs` for the
13//! full design rationale):
14//!
15//! 1. `model2vec_rs::StaticModel::encode_with_args` runs `pool_ids`
16//! in a serial inner loop while `tokenizers::encode_batch_fast`
17//! spawns its own rayon pool. Wrapping that path in our outer
18//! `par_chunks` produced 60% `__psynch_cvwait` in the linux-corpus
19//! profile — nested rayon scopes parking on each other. The
20//! reimplementation does ONE big tokenize plus a `par_iter` over
21//! `pool_ids` — no nested rayon, no parking.
22//! 2. `model2vec-rs 0.2` pinned `ndarray 0.15`; ripvec-core uses
23//! `ndarray 0.17`. The two `Array2<f32>` types were not
24//! interchangeable, forcing a `Vec<Vec<f32>>` shim. Owning the
25//! load path eliminates the mismatch.
26
27use std::path::{Path, PathBuf};
28use std::sync::Mutex;
29
30use crossbeam_channel::bounded;
31use hf_hub::api::sync::Api;
32use rayon::prelude::*;
33
34use crate::chunk::CodeChunk;
35use crate::embed::SearchConfig;
36use crate::encoder::VectorEncoder;
37use crate::encoder::ripvec::chunking::{DEFAULT_DESIRED_CHUNK_CHARS, chunk_source};
38use crate::encoder::ripvec::static_model::StaticEmbedModel;
39use crate::languages::config_for_extension;
40use crate::profile::Profiler;
41use crate::walk::collect_files_with_options;
42
43/// Encode batch size used by the streaming pipeline. Matches
44/// `StaticEmbedModel`'s internal `BATCH_SIZE` so each emitted batch
45/// is exactly one `encode_batch_fast` call's worth of work.
46const PIPELINE_BATCH_SIZE: usize = 1024;
47
48/// Number of full batches allowed in-flight from chunker to encoder.
49/// Provides enough pipeline depth for the encoder to stay busy while
50/// the chunker fills the next batch; small enough that peak memory
51/// stays bounded.
52const PIPELINE_RING_SIZE: usize = 4;
53
54/// Default model repo identifier for the ripvec path. This is the HF
55/// repo string used as `identity()`; the loader reads files from a
56/// local path passed via `--model-repo`.
57pub const DEFAULT_MODEL_REPO: &str = "minishlab/potion-code-16M";
58
59/// Default hidden dimension for [`DEFAULT_MODEL_REPO`].
60pub const DEFAULT_HIDDEN_DIM: usize = 256;
61
62/// Maximum source file size to read, in bytes (mirrors semble's
63/// `_MAX_FILE_BYTES = 1_000_000` from `index/create.py:16`).
64const MAX_FILE_BYTES: u64 = 1_000_000;
65
66/// CPU-only static encoder.
67///
68/// Owns a loaded [`StaticEmbedModel`] plus identity metadata. The
69/// embedder is constructed by `main.rs::load_pipeline` via
70/// [`StaticEncoder::from_pretrained`], passing either a local path
71/// containing the Model2Vec files or (planned) an HF repo ID.
72pub struct StaticEncoder {
73 model: StaticEmbedModel,
74 model_repo: String,
75 hidden_dim: usize,
76}
77
78impl StaticEncoder {
79 /// Encode a query string into a single embedding row.
80 ///
81 /// Used by `RipvecIndex::search` for hybrid/semantic dispatch.
82 #[must_use]
83 pub fn encode_query(&self, query: &str) -> Vec<f32> {
84 self.model.encode_query(query)
85 }
86
87 /// Load a model by HuggingFace repo ID or local path.
88 ///
89 /// Two acceptance shapes:
90 ///
91 /// 1. **Local path** — if `model_repo` names an existing directory,
92 /// load directly from it. Used by the parity test fixture path
93 /// (`/tmp/potion-base-32M`) and any user pre-staging files.
94 /// 2. **HuggingFace repo ID** — otherwise treat as `org/repo`,
95 /// download `config.json` / `tokenizer.json` / `model.safetensors`
96 /// via `hf-hub` into `~/.cache/huggingface/hub/`, and load from
97 /// there. Matches `load_classic_cpu` / `load_modernbert_cpu`'s
98 /// behaviour so the user-facing API is consistent: bare `--model
99 /// ripvec` with no `--model-repo` flag works.
100 ///
101 /// # Errors
102 ///
103 /// Propagates the underlying I/O, download, or parse error if the
104 /// files cannot be obtained or the safetensors layout is
105 /// unrecognized.
106 pub fn from_pretrained(model_repo: &str) -> crate::Result<Self> {
107 let resolved = Self::resolve_model_dir(model_repo)?;
108 let model = StaticEmbedModel::from_path(&resolved, Some(true))
109 .map_err(|e| crate::Error::Other(anyhow::anyhow!("static model load failed: {e}")))?;
110 let hidden_dim = model.hidden_dim();
111 Ok(Self {
112 model,
113 model_repo: model_repo.to_string(),
114 hidden_dim,
115 })
116 }
117
118 /// Resolve `model_repo` to a directory containing the model files.
119 ///
120 /// If `model_repo` is an existing local directory, returns it as-is.
121 /// Otherwise downloads via `hf-hub` and returns the cache directory.
122 fn resolve_model_dir(model_repo: &str) -> crate::Result<PathBuf> {
123 let local = Path::new(model_repo);
124 if local.is_dir() {
125 return Ok(local.to_path_buf());
126 }
127
128 // HuggingFace repo path. Download the three required files and
129 // return the directory `hf-hub` cached them into. All files
130 // land in the same snapshot directory.
131 let api = Api::new().map_err(|e| crate::Error::Download(e.to_string()))?;
132 let repo = api.model(model_repo.to_string());
133 let _ = repo
134 .get("config.json")
135 .map_err(|e| crate::Error::Download(e.to_string()))?;
136 let _ = repo
137 .get("tokenizer.json")
138 .map_err(|e| crate::Error::Download(e.to_string()))?;
139 let weights_path = repo
140 .get("model.safetensors")
141 .map_err(|e| crate::Error::Download(e.to_string()))?;
142 // hf-hub returns the file path; the snapshot directory is its parent.
143 weights_path
144 .parent()
145 .map(std::path::Path::to_path_buf)
146 .ok_or_else(|| {
147 crate::Error::Other(anyhow::anyhow!(
148 "hf-hub returned root path for {model_repo}; cannot resolve snapshot dir"
149 ))
150 })
151 }
152}
153
154impl VectorEncoder for StaticEncoder {
155 /// Three-stage bounded-queue pipeline:
156 ///
157 /// 1. **Chunk producer** — rayon `par_iter` over the file list. Each
158 /// file is read, parsed by tree-sitter (or line-merged on
159 /// fallback), and emitted as `(CodeChunk, String)` pairs into a
160 /// bounded channel of capacity `PIPELINE_BATCH_SIZE * 8`.
161 /// 2. **Batch accumulator** — a single scoped thread drains the
162 /// chunk channel, packs `PIPELINE_BATCH_SIZE` pairs per batch,
163 /// and forwards into a bounded channel of capacity
164 /// `PIPELINE_RING_SIZE`.
165 /// 3. **Encode worker** — a single scoped thread receives batches
166 /// and calls `StaticEmbedModel::encode_batch`, whose internal
167 /// `par_iter` lights up rayon for the pool_ids kernel.
168 ///
169 /// Why this shape:
170 ///
171 /// - The previous "chunk all, then embed all" implementation held
172 /// the entire `Vec<String>` of chunk contents in memory between
173 /// phases. On the linux corpus that was ~400 MB peak. The
174 /// bounded queues cap in-flight memory at
175 /// `PIPELINE_BATCH_SIZE * 8 + PIPELINE_RING_SIZE * PIPELINE_BATCH_SIZE`
176 /// chunks regardless of corpus size — under 15 MB.
177 /// - The chunk phase (13s on linux) is hidden inside the embed
178 /// phase (70s) instead of serializing before it. Pre-pipeline
179 /// profile showed user-time at 394s on 82s wall = 4.8x
180 /// parallelism on 12 cores; pipeline lets idle cores chew on
181 /// chunking while embed runs.
182 /// - Mirrors `embed::embed_all_streaming`'s shape so the two
183 /// pipelines (BERT + semble) share architectural conventions.
184 fn embed_root(
185 &self,
186 root: &Path,
187 cfg: &SearchConfig,
188 profiler: &Profiler,
189 ) -> crate::Result<(Vec<CodeChunk>, Vec<Vec<f32>>)> {
190 // Phase 1: walk (still serial-to-pipeline because we need the
191 // full file list to par_iter over; the walk itself is rayon).
192 let walk_options = cfg.walk_options();
193 let file_paths = {
194 let _guard = profiler.phase("walk");
195 collect_files_with_options(root, &walk_options)
196 };
197 if file_paths.is_empty() {
198 return Ok((Vec::new(), Vec::new()));
199 }
200
201 // Bounded channels. See module constants for the rationale on
202 // PIPELINE_BATCH_SIZE and PIPELINE_RING_SIZE.
203 let (chunk_tx, chunk_rx) = bounded::<(CodeChunk, String)>(PIPELINE_BATCH_SIZE * 8);
204 let (batch_tx, batch_rx) = bounded::<Vec<(CodeChunk, String)>>(PIPELINE_RING_SIZE);
205
206 // The encoder stage writes ordered output behind a Mutex. Order
207 // across files isn't meaningful (RipvecIndex doesn't rely on
208 // chunk order), only the chunk[i] <-> embedding[i] pairing
209 // matters — which we preserve trivially by pushing in lockstep.
210 let output: Mutex<Vec<(CodeChunk, Vec<f32>)>> = Mutex::new(Vec::new());
211 let model = &self.model;
212
213 // Stage 1 runs on a DEDICATED rayon thread pool. If we used
214 // the global pool, Stage 1's par_iter workers would park on
215 // full `chunk_tx.send()` calls, and Stage 3's
216 // `encode_batch` → `pool_ids` par_iter would have no rayon
217 // workers available (they're all parked). That's a classic
218 // nested-rayon deadlock — observed in profiling as PID stuck
219 // at 0% CPU with 16 parked threads.
220 //
221 // Half the cores for chunking, half remain in the global pool
222 // for the encode worker's pool_ids. The chunk phase (tree-
223 // sitter + I/O bound) doesn't need full parallelism to
224 // pipeline cleanly behind embed.
225 let num_cores = rayon::current_num_threads().max(2);
226 let chunk_threads = (num_cores / 2).max(1);
227 let chunk_pool = rayon::ThreadPoolBuilder::new()
228 .num_threads(chunk_threads)
229 .thread_name(|i| format!("semble-chunk-{i}"))
230 .build()
231 .map_err(|e| crate::Error::Other(anyhow::anyhow!("chunk thread pool build: {e}")))?;
232
233 let _phase_guard = profiler.phase("pipeline");
234 std::thread::scope(|scope| {
235 // Stage 1: chunk producer on the dedicated pool.
236 let chunk_tx_owned = chunk_tx;
237 scope.spawn(move || {
238 chunk_pool.install(|| {
239 file_paths.par_iter().for_each(|full| {
240 let (chunks, contents) = chunk_one_file(root, full);
241 for (chunk, content) in chunks.into_iter().zip(contents) {
242 if chunk_tx_owned.send((chunk, content)).is_err() {
243 return;
244 }
245 }
246 });
247 });
248 // chunk_tx_owned drops here, closing the channel.
249 });
250
251 // Stage 2: batch accumulator.
252 let batch_tx_owned = batch_tx;
253 scope.spawn(move || {
254 let mut buf: Vec<(CodeChunk, String)> = Vec::with_capacity(PIPELINE_BATCH_SIZE);
255 for pair in chunk_rx {
256 buf.push(pair);
257 if buf.len() >= PIPELINE_BATCH_SIZE {
258 let batch =
259 std::mem::replace(&mut buf, Vec::with_capacity(PIPELINE_BATCH_SIZE));
260 if batch_tx_owned.send(batch).is_err() {
261 return;
262 }
263 }
264 }
265 if !buf.is_empty() {
266 let _ = batch_tx_owned.send(buf);
267 }
268 // batch_tx_owned drops here, closing the channel.
269 });
270
271 // Stage 3: encode worker.
272 scope.spawn(|| {
273 for batch in batch_rx {
274 if batch.is_empty() {
275 continue;
276 }
277 let mut chunks = Vec::with_capacity(batch.len());
278 let mut texts: Vec<String> = Vec::with_capacity(batch.len());
279 for (chunk, text) in batch {
280 chunks.push(chunk);
281 texts.push(text);
282 }
283 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
284 let embeddings = model.encode_batch(&text_refs);
285 debug_assert_eq!(embeddings.len(), chunks.len());
286 let mut out = output.lock().expect("output mutex poisoned");
287 for (chunk, emb) in chunks.into_iter().zip(embeddings) {
288 out.push((chunk, emb));
289 }
290 }
291 });
292 });
293
294 let collected = output.into_inner().expect("output mutex poisoned");
295 let mut chunks_out = Vec::with_capacity(collected.len());
296 let mut embs_out = Vec::with_capacity(collected.len());
297 for (chunk, emb) in collected {
298 chunks_out.push(chunk);
299 embs_out.push(emb);
300 }
301 Ok((chunks_out, embs_out))
302 }
303
304 fn hidden_dim(&self) -> usize {
305 self.hidden_dim
306 }
307
308 fn identity(&self) -> &str {
309 &self.model_repo
310 }
311}
312
313/// Chunk one file. Returns `(file_chunks, file_contents)` — empty
314/// when the file is too large, can't be read, or has no chunks.
315fn chunk_one_file(root: &Path, full: &Path) -> (Vec<CodeChunk>, Vec<String>) {
316 match std::fs::metadata(full) {
317 Ok(meta) if meta.len() > MAX_FILE_BYTES => return (Vec::new(), Vec::new()),
318 Err(_) => return (Vec::new(), Vec::new()),
319 _ => {}
320 }
321 let Ok(source) = std::fs::read_to_string(full) else {
322 return (Vec::new(), Vec::new());
323 };
324
325 let ext = full
326 .extension()
327 .and_then(|e| e.to_str())
328 .unwrap_or_default();
329 let lang_cfg = config_for_extension(ext);
330 let language = lang_cfg.as_ref().map(|c| &c.language);
331
332 let rel_path = full
333 .strip_prefix(root)
334 .unwrap_or(full)
335 .display()
336 .to_string();
337
338 let boundaries = chunk_source(&source, language, DEFAULT_DESIRED_CHUNK_CHARS);
339 let mut chunks = Vec::with_capacity(boundaries.len());
340 let mut contents = Vec::with_capacity(boundaries.len());
341 for b in boundaries {
342 let text = b.content(&source).to_string();
343 if text.trim().is_empty() {
344 continue;
345 }
346 contents.push(text.clone());
347 chunks.push(CodeChunk {
348 file_path: rel_path.clone(),
349 name: String::new(),
350 kind: String::new(),
351 start_line: b.start_line,
352 end_line: b.end_line,
353 content: text.clone(),
354 enriched_content: text,
355 });
356 }
357 (chunks, contents)
358}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363 use crate::encoder::VectorEncoder;
364
365 /// `StaticEncoder` implements `VectorEncoder` + Send + Sync.
366 /// Compile-time check (`test:static-encoder-implements-vector-encoder`).
367 #[test]
368 fn static_encoder_implements_vector_encoder() {
369 fn assert_trait_object<T: VectorEncoder + Send + Sync>() {}
370 assert_trait_object::<StaticEncoder>();
371 }
372
373 /// `from_pretrained` returns the right hidden_dim from a probe encode.
374 /// Ignored by default because it requires a model download (~16 MB).
375 ///
376 /// Corresponds to acceptance `test:static-encoder-hidden-dim-256` and
377 /// `test:static-encoder-loads-potion-code-16m` and
378 /// `test:static-encoder-output-is-l2-normalized`.
379 #[test]
380 #[ignore = "requires local model files at RIPVEC_SEMBLE_MODEL_PATH"]
381 fn static_encoder_loads_potion_code_16m() {
382 let Ok(path) = std::env::var("RIPVEC_SEMBLE_MODEL_PATH") else {
383 eprintln!("RIPVEC_SEMBLE_MODEL_PATH not set; skipping");
384 return;
385 };
386 let enc = StaticEncoder::from_pretrained(&path).expect("model load should succeed");
387 assert_eq!(enc.hidden_dim(), DEFAULT_HIDDEN_DIM);
388 // identity() reflects what the caller passed (typically the
389 // local path under test).
390 assert_eq!(enc.identity(), path);
391
392 // Verify L2-normalized output via the public encode_query path.
393 let row = enc.encode_query("hello world");
394 let norm: f32 = row.iter().map(|x| x * x).sum::<f32>().sqrt();
395 assert!(
396 (norm - 1.0).abs() < 1e-3,
397 "expected L2-normalized output; got norm={norm}"
398 );
399 }
400}