Skip to main content

trusty_embedder/
lib.rs

1//! Shared text-embedding abstraction for trusty-* projects.
2//!
3//! Why: trusty-memory and trusty-search both shipped near-identical
4//! `Embedder` traits and `FastEmbedder` implementations, with subtle
5//! drift (cache vs no-cache, sync vs async warmup, `dim()` vs `dimension()`).
6//! Centralising fixes one bug in one place and lets future consumers pick up
7//! the embedder for free.
8//!
9//! What: an async `Embedder` trait with `embed_batch` as the single primitive
10//! (single-text embed is a free helper), plus a production `FastEmbedder`
11//! (fastembed-rs, all-MiniLM-L6-v2, 384-d) with LRU caching and ORT warmup,
12//! and a `MockEmbedder` test double behind the `test-support` feature.
13//!
14//! Test: `cargo test -p trusty-embedder` covers shape, cache hits, and the
15//! mock embedder. ONNX-backed tests are `#[ignore]` to keep CI under one
16//! cargo-feature umbrella.
17
18use std::num::NonZeroUsize;
19use std::sync::Arc;
20
21use anyhow::{Context, Result};
22use async_trait::async_trait;
23use fastembed::{EmbeddingModel, TextEmbedding, TextInitOptions};
24use lru::LruCache;
25use parking_lot::Mutex;
26
27/// Output dimension of the all-MiniLM-L6-v2 model.
28///
29/// Note: we now load the INT8-quantised variant (`AllMiniLML6V2Q`) which
30/// produces identical 384-dim vectors but runs ~3-4× faster on CPU ONNX
31/// and ships as a ~22MB file (vs 86MB for the f32 model).
32pub const EMBED_DIM: usize = 384;
33
34/// Default LRU cache capacity. Picked to be large enough to keep the
35/// hot working set of repeat queries in memory but small enough that the
36/// cache itself fits well inside L2/L3 on a typical developer machine.
37pub const DEFAULT_CACHE_CAPACITY: usize = 256;
38
39/// Identifier for the execution provider an embedder is actually using.
40///
41/// Why: callers want to log which backend is active (CPU vs CoreML/Metal vs
42/// CUDA) so operators can verify the daemon is GPU-accelerated without a
43/// debug log dive.
44/// What: a stable, human-friendly tag returned by `FastEmbedder::provider()`.
45/// Test: `FastEmbedder::new()` on Apple Silicon should yield `CoreML`; on
46/// other platforms it yields `Cpu` (or `Cuda` when the `cuda` feature is on).
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ExecutionProvider {
49    Cpu,
50    CoreML,
51    Cuda,
52}
53
54impl ExecutionProvider {
55    pub fn as_str(&self) -> &'static str {
56        match self {
57            ExecutionProvider::Cpu => "CPU",
58            ExecutionProvider::CoreML => "CoreML",
59            ExecutionProvider::Cuda => "CUDA",
60        }
61    }
62}
63
64impl std::fmt::Display for ExecutionProvider {
65    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66        f.write_str(self.as_str())
67    }
68}
69
70/// Abstraction over embedding backends.
71///
72/// Why: Decouple consumers from any one model so we can swap in remote APIs,
73/// quantised models, or deterministic mocks without changing call sites.
74/// What: a single primitive — `embed_batch` — plus a dimension accessor.
75/// Single-text callers should use the [`embed_one`] convenience helper.
76/// Test: covered by `FastEmbedder` and `MockEmbedder` tests below.
77#[async_trait]
78pub trait Embedder: Send + Sync {
79    /// Embed a batch of texts. Returns one `Vec<f32>` per input, each of
80    /// length `self.dimension()`. An empty input batch returns an empty Vec.
81    async fn embed_batch(&self, texts: &[String]) -> Result<Vec<Vec<f32>>>;
82
83    /// Output dimension of the produced embeddings.
84    fn dimension(&self) -> usize;
85}
86
87/// Convenience helper: embed a single text via `embed_batch` and return the
88/// lone vector.
89///
90/// Why: Most call sites only need one embedding at a time and writing
91/// `.embed_batch(&[text]).await?.into_iter().next()` everywhere is noise.
92/// What: builds a 1-element batch, calls `embed_batch`, returns the first
93/// vector (or errors if the embedder produced nothing).
94/// Test: covered indirectly by `mock_embedder_round_trip`.
95pub async fn embed_one(embedder: &dyn Embedder, text: &str) -> Result<Vec<f32>> {
96    let mut v = embedder.embed_batch(&[text.to_string()]).await?;
97    v.pop()
98        .context("embedder returned no embedding for non-empty input")
99}
100
101/// Local CPU embedder backed by fastembed-rs (ONNX runtime, all-MiniLM-L6-v2).
102///
103/// Why: Default to local-only embeddings so consumers have zero external
104/// network dependency and predictable latency. The LRU cache keeps the hot
105/// path free of redundant ONNX work for repeat strings (queries, common
106/// chunks).
107/// What: wraps a single `TextEmbedding` behind a `parking_lot::Mutex` (the
108/// underlying `embed` requires `&mut self`) and an `LruCache<String, Vec<f32>>`.
109/// Initialisation warms the ORT graph with a small batch so the first user
110/// query doesn't pay the one-shot compile cost.
111/// Test: `embed_batch_returns_correct_dim` and `cache_hit_is_idempotent`
112/// (marked `#[ignore]` — they download a real model).
113pub struct FastEmbedder {
114    model: Arc<Mutex<TextEmbedding>>,
115    cache: Arc<Mutex<LruCache<String, Vec<f32>>>>,
116    dim: usize,
117    provider: ExecutionProvider,
118}
119
120impl FastEmbedder {
121    /// Construct a new `FastEmbedder` with the default cache size.
122    pub async fn new() -> Result<Self> {
123        Self::with_cache_size(DEFAULT_CACHE_CAPACITY).await
124    }
125
126    /// Identifier for the execution provider this embedder is actually using.
127    ///
128    /// Why: callers (e.g. `trusty-search` startup logs) want to surface
129    /// whether the daemon is running on CPU or GPU/ANE without poking at
130    /// internals.
131    /// What: returns `ExecutionProvider::CoreML` on Apple Silicon (when EP
132    /// registration succeeded), otherwise `Cpu` (or `Cuda` if/when wired).
133    /// Test: covered by the public-surface compile check.
134    pub fn provider(&self) -> ExecutionProvider {
135        self.provider
136    }
137
138    /// Build `TextInitOptions` for the given model, attempting to register
139    /// the CoreML execution provider at runtime when on Apple Silicon.
140    ///
141    /// Why: We want zero-friction GPU/ANE acceleration on Apple Silicon
142    /// without forcing users to pass `--features coreml`. fastembed-rs accepts
143    /// a `Vec<ExecutionProviderDispatch>` via `with_execution_providers`, and
144    /// our `ort` dep (pinned to the exact `=2.0.0-rc.12` fastembed uses) has
145    /// the `coreml` feature on by default on macOS, so we can always try to
146    /// build and register CoreML at runtime. On non-Apple platforms, or if
147    /// CoreML registration fails for any reason, we transparently fall back
148    /// to the default CPU provider.
149    /// What: returns `(TextInitOptions, ExecutionProvider)` where the tag
150    /// reflects which backend was actually wired in.
151    /// Test: on an M-series Mac the tag is `CoreML`; on Intel/Linux/Windows
152    /// (or if CoreML build fails) the tag is `Cpu`.
153    fn init_options(model: EmbeddingModel) -> (TextInitOptions, ExecutionProvider) {
154        use ort::execution_providers::ExecutionProviderDispatch;
155
156        let opts = TextInitOptions::new(model);
157
158        // Always register an explicit CPU EP with the memory arena DISABLED.
159        //
160        // Why: ORT's default CPU memory arena pre-allocates a large contiguous
161        // slab sized to the peak tensor shape on first inference. For repos
162        // with 16k+ files this arena grows to 19-53 GB before any RSS soft cap
163        // can react (issue bobmatnyc/trusty-search#89). Disabling the arena
164        // forces per-inference allocations that are freed after each call,
165        // capping steady-state RSS at ~hundreds of MB instead of tens of GB.
166        let cpu_no_arena: ExecutionProviderDispatch =
167            ort::ep::CPU::default().with_arena_allocator(false).build();
168
169        // ──────────────────────────────────────────────────────────────────
170        // CUDA (Linux/Windows, NVIDIA GPU)
171        //
172        // Why: when the operator opts in with `--features cuda` and runs on a
173        // host with a CUDA-capable GPU, we should auto-prefer the CUDA EP so
174        // embedding throughput jumps from CPU-bound (~5h for a 40k-file repo)
175        // to GPU-bound (target <30 min). This mirrors the always-on CoreML
176        // pattern on Apple Silicon but is gated on the build-time `cuda`
177        // feature because the `ort/cuda` feature requires a CUDA toolkit at
178        // compile time. If the binary was built without `cuda`, this branch
179        // is compiled out entirely (no runtime cost, no link-time CUDA dep).
180        //
181        // Operator override: setting `TRUSTY_DEVICE=cpu` forces CPU even on a
182        // GPU-enabled binary. Useful for A/B benchmarking or for running on a
183        // host whose GPU is reserved for another workload.
184        // Test: on a g4dn.xlarge with `--features cuda` the provider tag
185        // resolves to `Cuda`; setting `TRUSTY_DEVICE=cpu` reverts to `Cpu`.
186        #[cfg(feature = "cuda")]
187        {
188            let force_cpu = std::env::var("TRUSTY_DEVICE")
189                .map(|v| v.eq_ignore_ascii_case("cpu"))
190                .unwrap_or(false);
191            if !force_cpu {
192                let cuda: ExecutionProviderDispatch = ort::ep::CUDA::default().build();
193                let providers: Vec<ExecutionProviderDispatch> = vec![cuda, cpu_no_arena];
194                tracing::info!(
195                    "trusty-embedder: registering CUDA + CPU(no-arena) execution providers \
196                     (will fall back to CPU at session-init if no CUDA device is available)"
197                );
198                return (
199                    opts.with_execution_providers(providers),
200                    ExecutionProvider::Cuda,
201                );
202            }
203            tracing::info!(
204                "trusty-embedder: TRUSTY_DEVICE=cpu set — skipping CUDA EP registration"
205            );
206        }
207
208        #[cfg(all(target_arch = "aarch64", target_os = "macos"))]
209        {
210            let coreml: ExecutionProviderDispatch = ort::ep::CoreML::default().build();
211            // CoreML first (GPU/ANE), CPU-no-arena as fallback. The CPU EP
212            // still applies its session-level DisableCpuMemArena flag even
213            // when CoreML handles most ops, which is what prevents the spike.
214            let providers: Vec<ExecutionProviderDispatch> = vec![coreml, cpu_no_arena];
215            tracing::info!(
216                "trusty-embedder: registering CoreML + CPU(no-arena) execution providers (Apple Silicon)"
217            );
218            return (
219                opts.with_execution_providers(providers),
220                ExecutionProvider::CoreML,
221            );
222        }
223
224        #[allow(unreachable_code)]
225        {
226            tracing::info!("trusty-embedder: registering CPU(no-arena) execution provider");
227            let providers: Vec<ExecutionProviderDispatch> = vec![cpu_no_arena];
228            (
229                opts.with_execution_providers(providers),
230                ExecutionProvider::Cpu,
231            )
232        }
233    }
234
235    /// Construct with an explicit LRU capacity.
236    pub async fn with_cache_size(capacity: usize) -> Result<Self> {
237        let capacity =
238            NonZeroUsize::new(capacity.max(1)).expect("capacity.max(1) is always non-zero");
239
240        // fastembed's `try_new` downloads + builds an ONNX session — blocking
241        // work that must run off the async reactor.
242        let (model, provider) =
243            tokio::task::spawn_blocking(|| -> Result<(TextEmbedding, ExecutionProvider)> {
244                // Honour the explicit `TRUSTY_DEVICE=gpu` requirement: when the
245                // operator asks for GPU, init_options will have selected an
246                // accelerated EP. If that EP fails to initialise (no GPU, no
247                // CUDA driver, etc.) AND the user did NOT explicitly require
248                // GPU, we transparently fall back to CPU. With `gpu` we
249                // surface the failure so the operator notices instead of
250                // silently running CPU-bound on a "GPU node".
251                let require_gpu = std::env::var("TRUSTY_DEVICE")
252                    .map(|v| v.eq_ignore_ascii_case("gpu"))
253                    .unwrap_or(false);
254
255                let (q_opts, q_provider) = Self::init_options(EmbeddingModel::AllMiniLML6V2Q);
256                let (m, provider) = match TextEmbedding::try_new(q_opts) {
257                    Ok(m) => (m, q_provider),
258                    Err(q_err) => {
259                        // Hardware-accelerated EP build failed — most often
260                        // "no CUDA device" or "CoreML EP not available". On a
261                        // best-effort tier (default), retry once with CPU only
262                        // so the daemon still starts. On `TRUSTY_DEVICE=gpu`
263                        // we propagate the original error.
264                        if q_provider != ExecutionProvider::Cpu && !require_gpu {
265                            tracing::warn!(
266                                "{} EP init failed ({q_err:#}); retrying with CPU-only \
267                                 execution provider",
268                                q_provider
269                            );
270                            // SAFETY: see TRUSTY_DEVICE comment in
271                            // init_options — the env mutation happens before
272                            // any worker thread reads it.
273                            unsafe { std::env::set_var("TRUSTY_DEVICE", "cpu") };
274                            let (cpu_opts, cpu_provider) =
275                                Self::init_options(EmbeddingModel::AllMiniLML6V2Q);
276                            match TextEmbedding::try_new(cpu_opts) {
277                                Ok(m) => (m, cpu_provider),
278                                Err(cpu_err) => {
279                                    tracing::warn!(
280                                        "AllMiniLML6V2Q init failed on CPU ({cpu_err:#}), \
281                                         falling back to AllMiniLML6V2"
282                                    );
283                                    let (fb_opts, fb_provider) =
284                                        Self::init_options(EmbeddingModel::AllMiniLML6V2);
285                                    let m = TextEmbedding::try_new(fb_opts).context(
286                                        "failed to initialise fastembed (tried CUDA→CPU on AllMiniLML6V2Q, then AllMiniLML6V2)",
287                                    )?;
288                                    (m, fb_provider)
289                                }
290                            }
291                        } else if require_gpu {
292                            return Err(anyhow::anyhow!(
293                                "TRUSTY_DEVICE=gpu requested but accelerated execution provider \
294                                 failed to initialise: {q_err:#}"
295                            ));
296                        } else {
297                            tracing::warn!(
298                                "AllMiniLML6V2Q init failed ({q_err:#}), falling back to AllMiniLML6V2"
299                            );
300                            let (fb_opts, fb_provider) =
301                                Self::init_options(EmbeddingModel::AllMiniLML6V2);
302                            let m = TextEmbedding::try_new(fb_opts).context(
303                                "failed to initialise fastembed (tried AllMiniLML6V2Q and AllMiniLML6V2)",
304                            )?;
305                            (m, fb_provider)
306                        }
307                    }
308                };
309                let mut m = m;
310
311                // Warm the graph so the first real user query is hot.
312                let warmup: Vec<&str> = vec![
313                    "hello world",
314                    "the quick brown fox",
315                    "memory palace warmup",
316                    "embedding model ready",
317                    "trusty common warmup",
318                ];
319                let _ = m
320                    .embed(warmup, None)
321                    .context("fastembed warmup batch failed")?;
322                Ok((m, provider))
323            })
324            .await
325            .context("spawn_blocking joined with error during embedder init")??;
326
327        tracing::info!(
328            "trusty-embedder: FastEmbedder ready (provider={}, dim={})",
329            provider,
330            EMBED_DIM
331        );
332
333        Ok(Self {
334            model: Arc::new(Mutex::new(model)),
335            cache: Arc::new(Mutex::new(LruCache::new(capacity))),
336            dim: EMBED_DIM,
337            provider,
338        })
339    }
340}
341
342#[async_trait]
343impl Embedder for FastEmbedder {
344    async fn embed_batch(&self, texts: &[String]) -> Result<Vec<Vec<f32>>> {
345        if texts.is_empty() {
346            return Ok(Vec::new());
347        }
348
349        // Split into cached hits vs misses.
350        let mut results: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
351        let mut to_compute: Vec<(usize, String)> = Vec::new();
352        {
353            let mut cache = self.cache.lock();
354            for (i, t) in texts.iter().enumerate() {
355                if let Some(v) = cache.get(t) {
356                    results[i] = Some(v.clone());
357                } else {
358                    to_compute.push((i, t.clone()));
359                }
360            }
361        }
362
363        if !to_compute.is_empty() {
364            let model = Arc::clone(&self.model);
365            let owned: Vec<String> = to_compute.iter().map(|(_, s)| s.clone()).collect();
366            let computed = tokio::task::spawn_blocking(move || -> Result<Vec<Vec<f32>>> {
367                let mut guard = model.lock();
368                guard
369                    .embed(owned, None)
370                    .context("fastembed embed call failed")
371            })
372            .await
373            .context("spawn_blocking joined with error during embed")??;
374
375            if computed.len() != to_compute.len() {
376                anyhow::bail!(
377                    "fastembed returned {} embeddings, expected {}",
378                    computed.len(),
379                    to_compute.len()
380                );
381            }
382
383            let mut cache = self.cache.lock();
384            for ((idx, key), vector) in to_compute.into_iter().zip(computed.into_iter()) {
385                cache.put(key, vector.clone());
386                results[idx] = Some(vector);
387            }
388        }
389
390        results
391            .into_iter()
392            .map(|opt| opt.context("missing embedding slot after batch"))
393            .collect()
394    }
395
396    fn dimension(&self) -> usize {
397        self.dim
398    }
399}
400
401/// Deterministic test double — hashes input bytes into a fixed-dim vector.
402///
403/// Why: ONNX model downloads dominate test runtime and can race on cold
404/// caches when multiple tests construct embedders in parallel. The mock
405/// gives integration tests a "rank by similarity" surface without any I/O.
406/// What: a tiny per-byte hash spread across `dim` slots, with the first byte
407/// always contributing so short/empty strings still differ.
408/// Test: `mock_embedder_round_trip` confirms shape + determinism.
409#[cfg(any(test, feature = "test-support"))]
410pub struct MockEmbedder {
411    dim: usize,
412}
413
414#[cfg(any(test, feature = "test-support"))]
415impl MockEmbedder {
416    pub fn new(dim: usize) -> Self {
417        Self { dim }
418    }
419
420    fn hash_to_vec(&self, text: &str) -> Vec<f32> {
421        let mut v = vec![0.0_f32; self.dim];
422        for (i, b) in text.bytes().enumerate() {
423            let slot = (i + b as usize) % self.dim;
424            v[slot] += (b as f32) / 255.0;
425        }
426        if let Some(first) = text.bytes().next() {
427            v[0] += first as f32 / 255.0;
428        }
429        v
430    }
431}
432
433#[cfg(any(test, feature = "test-support"))]
434#[async_trait]
435impl Embedder for MockEmbedder {
436    async fn embed_batch(&self, texts: &[String]) -> Result<Vec<Vec<f32>>> {
437        Ok(texts.iter().map(|t| self.hash_to_vec(t)).collect())
438    }
439
440    fn dimension(&self) -> usize {
441        self.dim
442    }
443}
444
445#[cfg(test)]
446mod tests {
447    use super::*;
448
449    #[tokio::test]
450    async fn mock_embedder_round_trip() {
451        let e = MockEmbedder::new(EMBED_DIM);
452        assert_eq!(e.dimension(), EMBED_DIM);
453        let v = embed_one(&e, "hello").await.unwrap();
454        assert_eq!(v.len(), EMBED_DIM);
455        let batch = e
456            .embed_batch(&["a".to_string(), "b".to_string()])
457            .await
458            .unwrap();
459        assert_eq!(batch.len(), 2);
460        assert_ne!(batch[0], batch[1]);
461    }
462
463    #[tokio::test]
464    async fn mock_embedder_empty_input_returns_empty() {
465        let e = MockEmbedder::new(EMBED_DIM);
466        let v = e.embed_batch(&[]).await.unwrap();
467        assert!(v.is_empty());
468    }
469
470    // ONNX-backed test: downloads ~23MB on first run. Marked ignored so default
471    // `cargo test` stays offline; run with `cargo test -- --ignored` when needed.
472    #[tokio::test]
473    #[ignore]
474    async fn fastembed_returns_correct_dim() {
475        let e = FastEmbedder::new().await.unwrap();
476        assert_eq!(e.dimension(), 384);
477        let v = embed_one(&e, "fn authenticate(user: &str) -> bool")
478            .await
479            .unwrap();
480        assert_eq!(v.len(), 384);
481        assert!(v.iter().any(|x| *x != 0.0));
482    }
483
484    #[tokio::test]
485    #[ignore]
486    async fn fastembed_cache_hit_is_idempotent() {
487        let e = FastEmbedder::new().await.unwrap();
488        let v1 = embed_one(&e, "cached").await.unwrap();
489        let v2 = embed_one(&e, "cached").await.unwrap();
490        assert_eq!(v1, v2);
491    }
492}