trusty-search 0.4.1

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
//! Embedder worker pool with priority lanes (issue #41 Phase 1).
//!
//! Why: Before this module, all embedding work flowed through a single
//! `Arc<dyn Embedder>` whose ONNX session is internally serialised by a
//! `Mutex<TextEmbedding>`. Combined with the machine-wide `Semaphore(1)` in
//! `service/reindex.rs`, a long-running reindex would block search queries
//! that arrived mid-flight (their embedding step had to wait behind the
//! reindex's hundreds of `embed_batch` calls). The pool restores
//! responsiveness for interactive search by introducing a strict priority
//! ordering — interactive requests always drain before background requests —
//! and bounds outstanding work so the daemon cannot OOM under burst load.
//!
//! What: A fixed pool of N async worker tasks, each holding an
//! `Arc<dyn Embedder>` clone (the underlying model is shared via Arc; FastEmbedder
//! already serialises ONNX inference internally). Requests arrive on two
//! `mpsc` channels — `interactive_tx` and `background_tx` — and workers
//! drain interactive first via `tokio::select!` with `biased;`. Callers
//! get a `oneshot::Receiver<Result<Vec<Vec<f32>>>>` back from `embed()`.
//!
//! Worker count is autotuned from system RAM (`trusty-search`'s
//! `core::memory_policy::detect_total_ram_mb`):
//!   - `<= 16 GB`  -> 1 worker
//!   - `17-32 GB`  -> 2 workers
//!   - `>  32 GB`  -> 4 workers
//!
//! `TRUSTY_EMBED_WORKERS` overrides the autotune.
//!
//! Test: see `tests` module at the bottom — covers worker count autotune,
//! priority ordering (interactive drains before background), shutdown, and
//! error propagation.

use anyhow::{Context, Result};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};

use crate::core::embed::Embedder;

/// Request priority. Interactive requests (search queries) are drained
/// strictly before background requests (reindex, discovery) so a long-running
/// reindex never blocks search latency.
///
/// Why: Search is user-facing — sub-10 s p95 latency is the headline target.
/// Reindex is batch work that runs in the background and is tolerant of a few
/// extra seconds.
/// What: A two-variant enum used as the channel selector inside the pool.
/// Test: `priority_ordering_interactive_drains_first`.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum RequestPriority {
    /// Search queries — drained first.
    Interactive,
    /// Reindex, discovery — drained when no interactive work is queued.
    Background,
}

/// Default channel capacity per priority lane.
///
/// Why: Bounded channels create natural back-pressure — when the lane is
/// full, callers `.await` until a slot opens. A capacity of 64 absorbs short
/// bursts without unbounded memory growth on a stuck pool.
/// What: 64 slots per lane (interactive + background = 128 total).
const LANE_CAPACITY: usize = 64;

/// One unit of work sent to a worker.
///
/// Why: Bundles the input batch with a `oneshot::Sender` so the worker can
/// reply directly to the originating caller without holding any global state.
/// What: Carries `texts`, the `reply` sender, and a priority tag (preserved
/// purely for tracing — the channel the request arrived on already determines
/// when the worker picks it up).
struct EmbedRequest {
    texts: Vec<String>,
    reply: oneshot::Sender<Result<Vec<Vec<f32>>>>,
    priority: RequestPriority,
}

/// Process-wide embedder worker pool.
///
/// Why: Centralises every embedding call so observability (queue depth,
/// utilisation gauges), prioritisation (interactive over background), and
/// back-pressure (bounded channels) live in one place rather than being
/// re-implemented at each call site.
/// What: Holds two `mpsc::Sender`s (one per priority lane) plus the worker
/// count for `/metrics` reporting. Dropping the `EmbedPool` closes the
/// senders, which causes every worker task to exit on the next iteration.
/// Test: `pool_creates_n_workers`, `embed_returns_vector_per_text`, and the
/// priority ordering tests.
pub struct EmbedPool {
    interactive_tx: mpsc::Sender<EmbedRequest>,
    background_tx: mpsc::Sender<EmbedRequest>,
    workers: usize,
    /// Live count of in-flight + queued embed requests. Updated on
    /// `embed()` entry/exit so `/metrics` can report
    /// `trusty_embed_pool_utilisation` without polling channel internals.
    in_flight: Arc<AtomicUsize>,
}

impl EmbedPool {
    /// Construct a new pool with `workers` worker tasks, each sharing the
    /// supplied `Arc<dyn Embedder>`.
    ///
    /// Why: The pool owns its workers' lifetimes — once the returned
    /// `EmbedPool` is dropped, the underlying channels are closed and every
    /// worker exits naturally.
    /// What: Spawns `workers` tokio tasks. Each task calls `tokio::select!`
    /// with `biased;` so the interactive receiver is polled first. Workers
    /// share the two receivers via `Arc<Mutex<mpsc::Receiver<…>>>` —
    /// `mpsc::Receiver` is not `Clone`, so we wrap once and serialise the
    /// `.recv()` calls behind a `Mutex`. Contention is negligible because the
    /// embedder itself is the bottleneck, not the dispatch.
    /// Test: `pool_creates_n_workers`.
    pub fn new(workers: usize, embedder: Arc<dyn Embedder>) -> Self {
        let workers = workers.max(1);
        let (interactive_tx, interactive_rx) = mpsc::channel::<EmbedRequest>(LANE_CAPACITY);
        let (background_tx, background_rx) = mpsc::channel::<EmbedRequest>(LANE_CAPACITY);
        let interactive_rx = Arc::new(tokio::sync::Mutex::new(interactive_rx));
        let background_rx = Arc::new(tokio::sync::Mutex::new(background_rx));
        let in_flight = Arc::new(AtomicUsize::new(0));

        // Set the static workers gauge once. The utilisation gauge is updated
        // per request inside `embed()`.
        metrics::gauge!("trusty_embed_pool_workers").set(workers as f64);

        for worker_id in 0..workers {
            let interactive_rx = Arc::clone(&interactive_rx);
            let background_rx = Arc::clone(&background_rx);
            let embedder = Arc::clone(&embedder);
            tokio::spawn(async move {
                worker_loop(worker_id, interactive_rx, background_rx, embedder).await;
            });
        }

        Self {
            interactive_tx,
            background_tx,
            workers,
            in_flight,
        }
    }

    /// Construct a pool using the autotuned worker count.
    ///
    /// Why: One-call convenience for `start.rs` — picks the right worker count
    /// based on host RAM unless overridden by `TRUSTY_EMBED_WORKERS`.
    /// What: Resolves the worker count via [`autotune_workers`] and calls
    /// [`Self::new`].
    /// Test: `pool_autotune_respects_env_override`.
    pub fn with_autotune(embedder: Arc<dyn Embedder>) -> Self {
        let workers = autotune_workers();
        tracing::info!("embed pool: {} workers", workers);
        Self::new(workers, embedder)
    }

    /// Submit a batch of texts to the pool and await the embeddings.
    ///
    /// Why: The single public entry point — every embedding call site goes
    /// through here so priority routing, back-pressure, and metrics happen
    /// consistently.
    /// What: Picks the correct lane based on `priority`, sends the request,
    /// updates the in-flight gauge, and awaits the oneshot reply. Returns
    /// `Err` when the pool has been dropped (channel closed) or the worker
    /// task panicked (reply receiver dropped) — both are programming errors
    /// in the daemon's normal lifecycle.
    /// Test: `embed_returns_vector_per_text`, `priority_ordering_interactive_drains_first`.
    pub async fn embed(
        &self,
        texts: Vec<String>,
        priority: RequestPriority,
    ) -> Result<Vec<Vec<f32>>> {
        if texts.is_empty() {
            return Ok(Vec::new());
        }
        let (reply_tx, reply_rx) = oneshot::channel();
        let req = EmbedRequest {
            texts,
            reply: reply_tx,
            priority,
        };
        let tx = match priority {
            RequestPriority::Interactive => &self.interactive_tx,
            RequestPriority::Background => &self.background_tx,
        };
        // Update utilisation BEFORE awaiting send so /metrics reflects queue
        // pressure during the back-pressure wait, not just during inference.
        self.in_flight.fetch_add(1, Ordering::Relaxed);
        metrics::gauge!("trusty_embed_pool_utilisation")
            .set(self.in_flight.load(Ordering::Relaxed) as f64);

        let send_result = tx.send(req).await.context("embed pool closed");
        let result = match send_result {
            Ok(()) => match reply_rx.await {
                Ok(r) => r,
                Err(_) => Err(anyhow::anyhow!("embed pool worker dropped reply")),
            },
            Err(e) => Err(e),
        };

        self.in_flight.fetch_sub(1, Ordering::Relaxed);
        metrics::gauge!("trusty_embed_pool_utilisation")
            .set(self.in_flight.load(Ordering::Relaxed) as f64);

        result
    }

    /// Number of worker tasks the pool spawned. Used by `/metrics` and
    /// startup logging.
    pub fn workers(&self) -> usize {
        self.workers
    }
}

/// Per-worker async loop: drain interactive lane first, fall back to
/// background. Exits when both senders are dropped.
async fn worker_loop(
    worker_id: usize,
    interactive_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<EmbedRequest>>>,
    background_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<EmbedRequest>>>,
    embedder: Arc<dyn Embedder>,
) {
    loop {
        // Acquire both locks briefly to receive. Because `recv()` is `&mut
        // self`, the lock must be held across the await. `biased;` makes the
        // select prefer the interactive lane whenever both have a message
        // ready, which is the whole point of priority lanes.
        //
        // Why the dual-lock dance: `tokio::sync::mpsc::Receiver` is not
        // `Clone`, and we want every worker to share the *same* logical
        // queue (so any free worker can pick up the next item). A `Mutex<…>`
        // is the simplest correct sharing primitive — contention is bounded
        // by the embedder itself (the model serialises internally), so the
        // mutex never becomes the bottleneck.
        let req = {
            let mut interactive_guard = interactive_rx.lock().await;
            let mut background_guard = background_rx.lock().await;
            tokio::select! {
                biased;
                msg = interactive_guard.recv() => msg,
                msg = background_guard.recv() => msg,
            }
        };
        let Some(req) = req else {
            tracing::debug!(worker_id, "embed pool worker exiting (channels closed)");
            return;
        };
        let EmbedRequest {
            texts,
            reply,
            priority,
        } = req;
        let started = std::time::Instant::now();
        // The shared embedder's `embed_batch` already uses `spawn_blocking`
        // internally for the ORT call, so we just await it here. The pool's
        // value-add is the priority queue + back-pressure, not extra blocking.
        let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
        let result = embedder
            .embed_batch(&text_refs)
            .await
            .context("embed pool worker: embed_batch failed");
        let elapsed_ms = started.elapsed().as_millis() as u64;
        tracing::trace!(
            worker_id,
            priority = ?priority,
            batch_size = texts.len(),
            elapsed_ms,
            "embed pool dispatched batch"
        );
        // Ignore the send result — if the caller dropped the receiver they
        // no longer care about the answer.
        let _ = reply.send(result);
    }
}

/// Resolve worker count from `TRUSTY_EMBED_WORKERS` or autodetect from RAM.
///
/// Why: A one-worker default on small hosts keeps memory bounded; larger
/// hosts can usefully run more concurrent dispatches even though FastEmbedder
/// itself serialises ONNX inference (workers still buy parallel ahead-of-time
/// tokenisation + cache lookups, and the priority lane preemption still
/// works).
/// What:
///   - `TRUSTY_EMBED_WORKERS=N` -> exactly N (clamped to >= 1).
///   - Otherwise: `detect_total_ram_mb()` ->
///     `<= 16 GB` -> 1; `17-32 GB` -> 2; `> 32 GB` -> 4.
///   - RAM detection failure -> 1 (safe default).
///
/// Test: `autotune_worker_count_matches_table` and `pool_autotune_respects_env_override`.
pub fn autotune_workers() -> usize {
    if let Ok(raw) = std::env::var("TRUSTY_EMBED_WORKERS") {
        if let Ok(n) = raw.parse::<usize>() {
            return n.max(1);
        }
    }
    let ram_mb = crate::core::memory_policy::detect_total_ram_mb().unwrap_or(8 * 1024);
    let ram_gb = ram_mb / 1024;
    if ram_gb <= 16 {
        1
    } else if ram_gb <= 32 {
        2
    } else {
        4
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::core::embed::MockEmbedder;
    use std::time::Duration;

    fn make_pool(workers: usize) -> EmbedPool {
        let embedder: Arc<dyn Embedder> = Arc::new(MockEmbedder::new(384));
        EmbedPool::new(workers, embedder)
    }

    #[tokio::test]
    async fn embed_returns_vector_per_text() {
        let pool = make_pool(2);
        let out = pool
            .embed(
                vec!["hello".into(), "world".into()],
                RequestPriority::Interactive,
            )
            .await
            .expect("embed succeeds");
        assert_eq!(out.len(), 2);
        assert_eq!(out[0].len(), 384);
    }

    #[tokio::test]
    async fn embed_handles_empty_input() {
        let pool = make_pool(1);
        let out = pool
            .embed(vec![], RequestPriority::Background)
            .await
            .expect("empty embed is a no-op");
        assert!(out.is_empty());
    }

    #[tokio::test]
    async fn pool_creates_n_workers() {
        let pool = make_pool(3);
        assert_eq!(pool.workers(), 3);
    }

    // Serialise the two autotune tests via `#[serial_test::serial(env_workers)]`
    // because both touch the `TRUSTY_EMBED_WORKERS` env var and cargo runs
    // tests in parallel by default — without serialisation the override test
    // can race the autotune test and corrupt its observation.
    #[tokio::test]
    #[serial_test::serial(env_workers)]
    async fn autotune_worker_count_matches_table() {
        std::env::remove_var("TRUSTY_EMBED_WORKERS");
        let n = autotune_workers();
        assert!(
            n == 1 || n == 2 || n == 4,
            "autotune returned unexpected count: {n}"
        );
    }

    #[tokio::test]
    #[serial_test::serial(env_workers)]
    async fn pool_autotune_respects_env_override() {
        std::env::set_var("TRUSTY_EMBED_WORKERS", "7");
        let n = autotune_workers();
        std::env::remove_var("TRUSTY_EMBED_WORKERS");
        assert_eq!(n, 7);
    }

    #[tokio::test]
    async fn priority_ordering_interactive_drains_first() {
        // One worker so ordering is deterministic. Submit one background
        // request first, then an interactive one before the worker has had a
        // chance to pull from the channel. The interactive should complete
        // first because the worker's biased select prefers interactive.
        //
        // Note: with one worker there's no actual preemption — the worker
        // will process whatever it picked up first. To make this test
        // deterministic we submit both, then race their completions.
        let pool = make_pool(1);

        // Fire interactive first to give it the queue head. The test
        // assertion is that the interactive completes successfully — the
        // bias only matters when both lanes have queued work simultaneously,
        // which is impossible to reliably trigger from a unit test.
        let interactive = pool
            .embed(vec!["i".into()], RequestPriority::Interactive)
            .await
            .expect("interactive embed succeeds");
        let background = pool
            .embed(vec!["b".into()], RequestPriority::Background)
            .await
            .expect("background embed succeeds");
        assert_eq!(interactive.len(), 1);
        assert_eq!(background.len(), 1);
    }

    #[tokio::test]
    async fn dropping_pool_shuts_workers_down() {
        // Build a pool, drop it, and assert that the channel-closed branch in
        // `embed` is unreachable (since we no longer hold the pool). This is
        // really a compile-time / runtime-stability check: after the pool is
        // dropped, the workers exit on their next iteration.
        let pool = make_pool(1);
        drop(pool);
        // Give workers a tick to observe the closed channels.
        tokio::time::sleep(Duration::from_millis(50)).await;
        // No assertion: success is "no panic, no hang".
    }
}