Skip to main content

sqlite_graphrag/
embedder.rs

1//! Embedding generation for the GraphRAG memory.
2//!
3//! v1.0.76: the default build is **LLM-only** — the binary does NOT bundle
4//! fastembed / ort / ndarray / tokenizers. All embeddings are produced
5//! by a headless invocation of `claude code` or `codex` (OAuth, no MCP,
6//! no hooks) and stored as a BLOB in `memory_embeddings(memory_id, embedding,
7//! source)`. Vector similarity is computed in pure Rust at query time.
8//!
9//! # Workload classification (G42/S3, BLOCK 1 — MANDATORY)
10//!
11//! LLM embedding is **I/O-bound + subprocess-bound**: each call waits
12//! 5-60s on a network round-trip through a headless `claude -p` /
13//! `codex exec` subprocess while the local CPU stays idle. Concurrency
14//! therefore uses **tokio** (async I/O concurrency) and NEVER rayon
15//! (reserved for CPU-bound work).
16//!
17//! # Permit formula (G42/S3, BLOCO 2)
18//!
19//! ```text
20//! permits = clamp(--llm-parallelism, 1, 32)
21//!           .min(available_parallelism())
22//!           .min(available_ram_mb * 0.5 / LLM_WORKER_RSS_MB)
23//! ```
24//!
25//! `LLM_WORKER_RSS_MB = 350` (`crate::constants`): `claude -p` and
26//! `codex exec` are node processes with a typical Maximum RSS of
27//! 200-400 MB (measured via `/usr/bin/time -l` on macOS /
28//! `/usr/bin/time -v` on Linux), so the RAM bound is pertinent.
29//!
30//! # Locking contract (G42/A3 fix)
31//!
32//! The process-wide `Mutex<LlmEmbedding>` protects ONLY the cheap clone
33//! of the client configuration (flavour + binary path + model + shared
34//! schema tempfiles). It is NEVER held across network I/O — the
35//! v1.0.76-v1.0.78 `flush_group` held it for the whole sequential
36//! embedding loop, which is why `--llm-parallelism 8` measured an
37//! effective parallelism of 1.
38
39use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49/// Process-wide LLM-embedding client behind a .
50///
51/// The lock guards configuration cloning only (see module docs); the
52/// actual LLM I/O happens on clones, outside the lock.
53///
54/// ADR-0042 / GAP-002: process-wide Claude-backed LLM-embedding client
55/// behind a `Mutex`. Distinct from `EMBEDDER` so the Claude path of
56/// `embed_via_backend` no longer re-probes PATH via `detect_available`
57/// (the v1.0.82 bug where requesting Claude could resolve to Codex).
58static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62/// v1.0.93: check whether the OpenRouter client has been initialised.
63pub fn is_openrouter_initialized() -> bool {
64    OPENROUTER_CLIENT.get().is_some()
65}
66static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
67
68/// Process-wide multi-thread tokio runtime for embedding I/O.
69///
70/// G42/A2 fix: v1.0.76-v1.0.78 built a current-thread runtime PER CALL.
71/// One runtime per process amortises the setup and hosts the bounded
72/// fan-out of `embed_texts_parallel`.
73static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
74
75/// Calibration base: chunk (long-text) batch size per LLM call at the
76/// calibration dimensionality (G42/S2). Use [`chunk_embed_batch_size`]
77/// for the dim-adaptive value (G44).
78pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
79
80/// Calibration base: entity-name (short-text) batch size per LLM call at
81/// the calibration dimensionality (G42/S2). Use [`entity_embed_batch_size`]
82/// for the dim-adaptive value (G44).
83pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
84
85/// Dimensionality the batch bases above were calibrated against (G44).
86pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
87
88/// G44: scales a calibration-base batch size to the active dimensionality,
89/// keeping the float budget per LLM call constant (~512 floats for chunks,
90/// ~1600 for entity names — the budgets empirically validated at dim 64).
91/// Fixed batches of 8 at 384 dims asked for ~3072 floats per response:
92/// claude returned partial coverage (3 of 8 items, caught by the G42/C5
93/// check) and codex timed out at 300s. `base.max(1)` keeps the function
94/// total — `clamp` panics when the upper bound is below the lower one.
95fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
96    let base = base.max(1);
97    (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
98}
99
100/// Dim-adaptive batch size for chunk (long-text) embedding calls (G44).
101pub fn chunk_embed_batch_size() -> usize {
102    let dim = crate::constants::embedding_dim();
103    let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
104    tracing::debug!(
105        dim,
106        base = CHUNK_EMBED_BATCH_SIZE,
107        batch,
108        "adaptive chunk batch size (G44)"
109    );
110    batch
111}
112
113/// Dim-adaptive batch size for entity-name (short-text) embedding calls (G44).
114pub fn entity_embed_batch_size() -> usize {
115    let dim = crate::constants::embedding_dim();
116    let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
117    tracing::debug!(
118        dim,
119        base = ENTITY_EMBED_BATCH_SIZE,
120        batch,
121        "adaptive entity batch size (G44)"
122    );
123    batch
124}
125
126/// Returns the process-wide multi-thread runtime, building it on first use.
127pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
128    if let Some(rt) = RUNTIME.get() {
129        return Ok(rt);
130    }
131    let rt = tokio::runtime::Builder::new_multi_thread()
132        .worker_threads(2)
133        .enable_all()
134        .build()
135        .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
136    let _ = RUNTIME.set(rt);
137    Ok(RUNTIME.get().expect("RUNTIME initialised above"))
138}
139
140/// Initialises the LLM-embedding client on first use and returns it.
141pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
142    if let Some(e) = EMBEDDER.get() {
143        return Ok(e);
144    }
145    let backend = LlmEmbedding::detect_available()?;
146    let _ = EMBEDDER.set(Mutex::new(backend));
147    Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
148}
149
150/// ADR-0042 / GAP-002: returns the process-wide Claude embedder, lazily
151/// initialising it on first use. Binary and model overrides come from
152/// the explicit arguments; `None` falls back to PATH/env defaults via
153/// the builder.
154pub fn get_claude_embedder(
155    claude_binary: Option<&Path>,
156    claude_model: Option<&str>,
157) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
158    if let Some(e) = CLAUDE_EMBEDDER.get() {
159        return Ok(e);
160    }
161    let mut builder = LlmEmbedding::with_claude_builder();
162    if let Some(b) = claude_binary {
163        builder = builder.override_binary(b.to_path_buf());
164    }
165    if let Some(m) = claude_model {
166        builder = builder.override_model(m.to_string());
167    }
168    let backend = builder.build()?;
169    let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
170    Ok(CLAUDE_EMBEDDER
171        .get()
172        .expect("CLAUDE_EMBEDDER initialised above"))
173}
174
175/// GAP-OPENCODE-001 / v1.0.90: returns the process-wide OpenCode embedder,
176/// lazily initialising it on first use. Binary and model overrides come
177/// from the explicit arguments; `None` falls back to PATH/env defaults via
178/// the builder.
179pub fn get_opencode_embedder(
180    opencode_binary: Option<&Path>,
181    opencode_model: Option<&str>,
182) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
183    if let Some(e) = OPENCODE_EMBEDDER.get() {
184        return Ok(e);
185    }
186    let mut builder = LlmEmbedding::with_opencode_builder();
187    if let Some(b) = opencode_binary {
188        builder = builder.override_binary(b.to_path_buf());
189    }
190    if let Some(m) = opencode_model {
191        builder = builder.override_model(m.to_string());
192    }
193    let backend = builder.build()?;
194    let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
195    Ok(OPENCODE_EMBEDDER
196        .get()
197        .expect("OPENCODE_EMBEDDER initialised above"))
198}
199
200pub fn get_openrouter_embedder(
201    api_key: secrecy::SecretBox<String>,
202    model: &str,
203    dim: usize,
204) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
205    if let Some(c) = OPENROUTER_CLIENT.get() {
206        return Ok(c);
207    }
208    let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
209    let _ = OPENROUTER_CLIENT.set(client);
210    Ok(OPENROUTER_CLIENT
211        .get()
212        .expect("OPENROUTER_CLIENT initialised above"))
213}
214
215/// ADR-0042 / GAP-002: route a single passage through the Claude
216/// embedder. Used by the Claude arm of `embed_via_backend` so the
217/// fallback chain stops treating Claude as a synonym for codex.
218pub fn embed_via_claude_local(
219    _models_dir: &Path,
220    text: &str,
221    claude_binary: Option<&Path>,
222    claude_model: Option<&str>,
223) -> Result<Vec<f32>, AppError> {
224    let _slot_guard = acquire_llm_slot_for_embedding()?;
225    let embedder = get_claude_embedder(claude_binary, claude_model)?;
226    embed_passage(embedder, text)
227}
228
229/// BUG-003 / v1.0.85: split of  that also
230/// reports the resolved []. Always  because
231/// this path constructs a Claude-flavoured embedder via
232///  (no PATH probe, no silent substitution).
233pub fn embed_via_claude_local_resolved(
234    _models_dir: &Path,
235    text: &str,
236    claude_binary: Option<&Path>,
237    claude_model: Option<&str>,
238) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
239    let _slot_guard = acquire_llm_slot_for_embedding()?;
240    let embedder = get_claude_embedder(claude_binary, claude_model)?;
241    let v = embed_passage(embedder, text)?;
242    Ok((v, LlmBackendKind::Claude))
243}
244
245/// GAP-OPENCODE-001 / v1.0.90: route a single passage through the OpenCode
246/// embedder, reporting the resolved [`LlmBackendKind::Opencode`]. Constructs
247/// an OpenCode-flavoured embedder via `with_opencode_builder` (no PATH probe,
248/// no silent substitution).
249pub fn embed_via_opencode_local_resolved(
250    _models_dir: &Path,
251    text: &str,
252    opencode_binary: Option<&Path>,
253    opencode_model: Option<&str>,
254) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
255    let _slot_guard = acquire_llm_slot_for_embedding()?;
256    let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
257    let v = embed_passage(embedder, text)?;
258    Ok((v, LlmBackendKind::Opencode))
259}
260/// Clones the embedding-client configuration. The lock is held only for
261/// the duration of the clone — NEVER across I/O (G42/A3).
262fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
263    embedder.lock().clone()
264}
265
266/// Embeds a single passage for storage. Delegates to the configured LLM
267/// headless (claude code / codex). Returns a vector of the active
268/// dimensionality.
269pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
270    let client = clone_client(embedder);
271    let result = client.embed_passage(text)?;
272    validate_dim(result)
273}
274
275/// Embeds a single query for similarity search. Same model and dim as
276/// `embed_passage`; the only difference is the LLM-side prompt prefix
277/// that the headless invocation uses to disambiguate.
278pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
279    let client = clone_client(embedder);
280    let result = client.embed_query(text)?;
281    validate_dim(result)
282}
283
284/// Embeds a batch of passages with token-count-aware batching.
285///
286/// Kept for API compatibility; since v1.0.79 it routes through the
287/// bounded parallel fan-out with conservative defaults.
288pub fn embed_passages_controlled(
289    embedder: &Mutex<LlmEmbedding>,
290    texts: &[&str],
291    _token_counts: &[usize],
292) -> Result<Vec<Vec<f32>>, AppError> {
293    if texts.is_empty() {
294        return Ok(Vec::new());
295    }
296    let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
297    embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
298}
299
300pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
301    let _slot_guard = acquire_llm_slot_for_embedding()?;
302    let embedder = get_embedder(models_dir)?;
303    embed_passage(embedder, text)
304}
305
306/// v1.0.89 (BUG-SKIP-EMBED): reads `SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE`
307/// env var (set by `--skip-embedding-on-failure` via main.rs propagation).
308/// Returns `true` when the user opted to persist with NULL embedding on failure.
309pub fn should_skip_embedding_on_failure() -> bool {
310    matches!(
311        std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
312        Ok("1") | Ok("true")
313    )
314}
315
316/// v1.0.89 (BUG-SKIP-EMBED + GAP-EMBED-PROPAGATION): embed a passage
317/// honouring both `--llm-backend` and `--skip-embedding-on-failure`.
318///
319/// On success returns `Ok(Some(vec))`. On failure:
320/// - if `--skip-embedding-on-failure` is active, logs a warning and returns `Ok(None)`
321/// - otherwise propagates the error (exit 11)
322pub fn embed_passage_or_skip(
323    models_dir: &Path,
324    text: &str,
325    choice: Option<crate::cli::LlmBackendChoice>,
326) -> Result<Option<Vec<f32>>, AppError> {
327    match embed_passage_with_choice(models_dir, text, choice) {
328        Ok((v, _backend)) => Ok(Some(v)),
329        Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
330        Err(e) => {
331            if should_skip_embedding_on_failure() {
332                tracing::warn!(
333                    error = %e,
334                    "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
335                );
336                Ok(None)
337            } else {
338                Err(e)
339            }
340        }
341    }
342}
343
344/// BUG-003 / v1.0.85: split of `embed_passage_local` that reports the
345/// resolved [`LlmBackendKind`] based on the ACTUAL
346/// [`LlmEmbedding::flavour`] of the embedder constructed. When
347/// `LlmEmbedding::detect_available` substitutes claude for a missing
348/// codex, the operator sees the truth in `envelope.backend_invoked`.
349pub fn embed_passage_local_resolved(
350    models_dir: &Path,
351    text: &str,
352) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
353    let _slot_guard = acquire_llm_slot_for_embedding()?;
354    let embedder = get_embedder(models_dir)?;
355    let v = embed_passage(embedder, text)?;
356    let kind = match embedder.lock().flavour() {
357        crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
358        crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
359        crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
360    };
361    Ok((v, kind))
362}
363
364pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
365    let _slot_guard = acquire_llm_slot_for_embedding()?;
366    let embedder = get_embedder(models_dir)?;
367    embed_query(embedder, text)
368}
369
370// =============================================================================
371// v1.0.82 (GAP-003): wrappers que aceitam a escolha do CLI
372// (`crate::cli::LlmBackendChoice`) e a traduzem em uma chain para
373// `embed_with_fallback`. Centralizam a propagação do flag `--llm-backend`
374// nos 6 comandos que produzem embedding (`remember`, `edit`, `ingest`,
375// `enrich`, `recall`, `hybrid-search`).
376// =============================================================================
377
378/// Embed a single passage using the LLM backend selected by the user via
379/// `--llm-backend`. Routes to `embed_with_fallback` so failures fall
380/// through to the next backend in the chain before giving up.
381///
382/// When `choice` is `None` (e.g. a sub-command that does not yet
383/// expose the flag), behaviour matches `embed_passage_local` — the
384/// active embedder from `LlmEmbedding::detect_available` decides the
385/// backend.
386pub fn embed_passage_with_choice(
387    models_dir: &Path,
388    text: &str,
389    choice: Option<crate::cli::LlmBackendChoice>,
390) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
391    let _slot_guard = acquire_llm_slot_for_embedding()?;
392    match choice {
393        None => {
394            let embedder = get_embedder(models_dir)?;
395            embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
396        }
397        Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
398    }
399}
400
401/// v1.0.93: embedding with `EmbeddingBackendChoice` awareness. When the
402/// embedding backend is `Openrouter` or `Auto` with a live client, the
403/// chain prepends `OpenRouter` before the LLM subprocess backends.
404pub fn embed_passage_with_embedding_choice(
405    models_dir: &Path,
406    text: &str,
407    embedding_backend: crate::cli::EmbeddingBackendChoice,
408    llm_backend: crate::cli::LlmBackendChoice,
409) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
410    let _slot_guard = acquire_llm_slot_for_embedding()?;
411    let chain = embedding_backend.to_chain(llm_backend);
412    embed_with_fallback(models_dir, text, &chain, false)
413}
414
415/// failure, returns a structured `FallbackReason` so the caller can
416/// surface `vec_degraded` instead of a hard exit 11.
417///
418/// `None` matches the legacy `try_embed_query_with_fallback` path
419/// (uses the active embedder without an explicit chain).
420pub fn try_embed_query_with_choice(
421    models_dir: &Path,
422    text: &str,
423    choice: Option<crate::cli::LlmBackendChoice>,
424) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
425    match embed_passage_with_choice(models_dir, text, choice) {
426        // GAP-004 / v1.0.85.1: when the chain terminates on
427        //  (i.e. user passed
428        // or every preceding backend failed),  returns
429        //  instead of an error. Without this guard the
430        // empty vector would propagate to  which
431        // aborts with exit 11 ("embedding has 0 dims, expected 64").
432        // The caller's contract is to surface a typed
433        // so  and  can route to FTS5-puro via
434        // the existing  /  envelope.
435        // Intercept the empty-vector success path and surface it as
436        //  (introduced at v1.0.85 / ADR-0043
437        // for the symmetric LLM-returned-zero-dim case).
438        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
439        Ok((v, backend)) => Ok((v, backend)),
440        Err(e) => Err(classify_embedding_error(e)),
441    }
442}
443/// v1.0.93 (GAP-OR-INGEST): query embedding with `EmbeddingBackendChoice`
444/// awareness. Mirrors `try_embed_query_with_choice` but routes through
445/// `embed_passage_with_embedding_choice` so OpenRouter API is used when
446/// configured.
447pub fn try_embed_query_with_embedding_choice(
448    models_dir: &Path,
449    text: &str,
450    embedding_backend: crate::cli::EmbeddingBackendChoice,
451    llm_backend: crate::cli::LlmBackendChoice,
452) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
453    match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
454        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
455        Ok((v, backend)) => Ok((v, backend)),
456        Err(e) => Err(classify_embedding_error(e)),
457    }
458}
459
460/// call. Reads the max-concurrency from
461/// `SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY` (default derived from
462/// `LLM_WORKER_RSS_MB` and available memory), and the wait timeout
463/// from `SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS` (default 30s).
464///
465/// Returns `Ok(guard)` for happy path, `AppError::LockBusy` (exit 75)
466/// when no slot is available within the wait window, and
467/// `AppError::Validation` when the concurrency is 0.
468///
469/// The `LLM_SLOT_NO_WAIT` env var (or its CLI flag equivalent) sets
470/// `wait_secs = 0` to fail fast in tests.
471fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
472    use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
473    let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
474        .ok()
475        .and_then(|s| s.parse::<u32>().ok())
476        .filter(|n| *n >= 1)
477        .unwrap_or_else(crate::llm_slots::default_max_concurrency);
478    let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
479        0
480    } else {
481        std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
482            .ok()
483            .and_then(|s| s.parse::<u64>().ok())
484            .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
485    };
486    let _ = LLM_WORKER_RSS_MB; // silence the unused import (used in default_max_concurrency)
487                               // GAP-003 / ADR-0043: when the slot semaphore is contended beyond the
488                               // backoff window (50 + 100 + 200 + 400 = 750ms total), return a
489                               // marker message that `classify_embedding_error` maps to
490                               // `FallbackReason::SlotExhausted` (discriminator `slot_exhausted`).
491                               // The window is shorter than the legacy 30s timeout, so the operator
492                               // observes FTS5-puro fallback quickly instead of after 30s of silence.
493    match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
494        Ok(guard) => Ok(guard),
495        Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
496            "slot exhausted: {e} (fall back to FTS5)"
497        ))),
498        Err(e) => Err(e),
499    }
500}
501/// GAP-004 (v1.0.88): typed classifier for embedding error messages.
502///
503/// Decomposes the legacy `AppError::Embedding(String)` payload into a
504/// small enum so the call sites can branch on the cause instead of
505/// repeating `msg.contains(...)` literals. The classification is purely
506/// lexical (case-insensitive substring match on the error message) — no
507/// I/O, no retries, no telemetry, deterministic and safe under
508/// `#[serial_test::serial(env)]`.
509///
510/// 6 variants cover the 5 known discriminators from v1.0.85 (ADR-0043)
511/// plus an `Unknown` fallback for messages that do not match any marker.
512#[derive(Debug, Clone, Copy, PartialEq, Eq)]
513pub enum EmbeddingErrorKind {
514    /// OAuth token expired or absent; no backend can authenticate.
515    OAuth,
516    /// OAuth usage quota exhausted on the named backend.
517    Quota,
518    /// LLM slot semaphore exhausted after the backoff window.
519    SlotExhausted,
520    /// User-requested backend differs from the one that actually executed.
521    BackendMismatch,
522    /// Embedding returned a zero-dimensional vector (structural bug).
523    ZeroDimension,
524    /// Message did not match any of the 5 markers above.
525    Unknown,
526}
527
528impl EmbeddingErrorKind {
529    /// Classify an embedding error message into a typed kind.
530    ///
531    /// Order of checks matters: `OAuth` is matched before `Quota` because
532    /// both substrings can co-occur in the same message. `SlotExhausted`
533    /// is checked before `Quota` because the slot-sema path is more
534    /// specific (the LLM never even tried to authenticate). The checks
535    /// are case-insensitive so `OAuth` and `oauth` both classify to
536    /// `EmbeddingErrorKind::OAuth`.
537    pub fn classify(msg: &str) -> Self {
538        let m = msg.to_lowercase();
539        if m.contains("oauth") {
540            Self::OAuth
541        } else if m.contains("quota") {
542            Self::Quota
543        } else if m.contains("slot exhausted") {
544            Self::SlotExhausted
545        } else if m.contains("backend mismatch") {
546            Self::BackendMismatch
547        } else if m.contains("dim") && m.contains("zero") {
548            Self::ZeroDimension
549        } else {
550            Self::Unknown
551        }
552    }
553
554    /// Stable, machine-friendly discriminator code (lowercase, kebab-safe).
555    pub fn code(&self) -> &'static str {
556        match self {
557            Self::OAuth => "oauth",
558            Self::Quota => "quota",
559            Self::SlotExhausted => "slot-exhausted",
560            Self::BackendMismatch => "backend-mismatch",
561            Self::ZeroDimension => "zero-dimension",
562            Self::Unknown => "unknown",
563        }
564    }
565}
566
567impl std::fmt::Display for EmbeddingErrorKind {
568    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569        f.write_str(self.code())
570    }
571}
572
573/// G58/S1: reason an embedding call could not be completed and the caller
574/// must fall back to a non-vector retrieval path (FTS5 prefix + LIKE).
575///
576/// Returned by [`try_embed_query_with_fallback`] so the `recall` and
577/// `hybrid-search` handlers can surface a structured `vec_degraded` /
578/// `warning` envelope instead of a hard `AppError::Embedding` exit 11.
579#[derive(Debug, Clone, PartialEq)]
580pub enum FallbackReason {
581    /// The LLM subprocess failed (rate limit, OAuth contention, quota
582    /// exhausted, model unparsable response, divergent dim, etc.).
583    /// Carries the original error message for observability.
584    EmbeddingFailed(String),
585    /// The LLM slot semaphore was exhausted: 8+ concurrent LLM
586    /// subprocesses blocked the acquire beyond the backoff window
587    /// (50ms + 100ms + 200ms + 400ms = 750ms total). Resolved at v1.0.85
588    /// (GAP-003 / ADR-0043).
589    SlotExhausted,
590    /// OAuth usage quota exhausted on the named backend. The caller
591    /// should retry with an alternative backend (codex ↔ claude)
592    /// before falling back to FTS5-puro.
593    OAuthQuota { backend: &'static str },
594    /// The user requested a backend that differs from the one that
595    /// actually executed the embedding (legacy "synonym for codex"
596    /// bug from v1.0.83). Resolved at v1.0.84 (GAP-002).
597    BackendMismatch {
598        requested: &'static str,
599        resolved: &'static str,
600    },
601    /// The embedding returned a zero-dimensional vector, signalling a
602    /// structural bug (the LLM did not produce any floats). Distinct
603    /// from OAuthQuota (quota exhausted) and EmbeddingFailed
604    /// (subprocess error).
605    DimZero,
606    /// The embedding was cancelled by an external signal (SIGTERM, etc.).
607    Cancelled,
608    /// The embedding exceeded its time budget. Carries the operation name
609    /// and the elapsed seconds for diagnostic logging.
610    Timeout {
611        operation: String,
612        duration_secs: u64,
613    },
614}
615
616impl FallbackReason {
617    /// Stable, machine-friendly reason code used by JSON envelopes
618    /// (`vec_degraded_reason`). Mirrors the v1.0.84 contract extended
619    /// at v1.0.85 with 4 new variants (GAP-003 / ADR-0043).
620    pub fn reason_code(&self) -> &'static str {
621        match self {
622            Self::EmbeddingFailed(_) => "embedding_failed",
623            Self::SlotExhausted => "slot_exhausted",
624            Self::OAuthQuota { .. } => "oauth_quota",
625            Self::BackendMismatch { .. } => "backend_mismatch",
626            Self::DimZero => "dim_zero",
627            Self::Cancelled => "cancelled",
628            Self::Timeout { .. } => "timeout",
629        }
630    }
631}
632
633impl std::fmt::Display for FallbackReason {
634    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635        match self {
636            Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
637            Self::SlotExhausted => write!(
638                f,
639                "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
640            ),
641            Self::OAuthQuota { backend } => {
642                write!(f, "OAuth usage quota exhausted on backend '{backend}'")
643            }
644            Self::BackendMismatch {
645                requested,
646                resolved,
647            } => {
648                write!(
649                    f,
650                    "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
651                )
652            }
653            Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
654            Self::Cancelled => write!(f, "embedding cancelled by external signal"),
655            Self::Timeout {
656                operation,
657                duration_secs,
658            } => {
659                write!(
660                    f,
661                    "embedding timed out after {duration_secs}s during {operation}"
662                )
663            }
664        }
665    }
666}
667
668impl std::error::Error for FallbackReason {}
669
670/// G58/S1: try to embed a query, mapping any failure to a structured
671/// [`FallbackReason`] so callers can route to FTS5 + LIKE fallback instead
672/// of returning exit 11 to the user.
673///
674/// This is the bridge between the hard-fail `embed_query_local` (used by
675/// write paths where embedding failure aborts the operation) and the
676/// graceful-degradation contract of `recall` / `hybrid-search` in v1.0.80.
677pub fn try_embed_query_with_fallback(
678    models_dir: &Path,
679    query: &str,
680) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
681    match embed_query_local(models_dir, query) {
682        Ok(v) => Ok((v, LlmBackendKind::None)),
683        Err(e) => Err(classify_embedding_error(e)),
684    }
685}
686
687/// G58 / ADR-0043 (v1.0.85): deterministic fallback for `recall` and
688/// `hybrid-search`.
689///
690/// - On `OAuthQuota { backend }`, retry once with the alternative backend
691///   (codex ↔ claude) before giving up.
692/// - On `SlotExhausted`, sleep 750ms and retry once (gives the slot
693///   semaphore time to release a permit from a sibling subprocess).
694/// - On any other `FallbackReason`, return immediately (deterministic).
695pub fn try_embed_query_with_deterministic_fallback(
696    models_dir: &Path,
697    query: &str,
698    choice: Option<crate::cli::LlmBackendChoice>,
699) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
700    match try_embed_query_with_choice(models_dir, query, choice) {
701        Ok(t) => Ok(t),
702        Err(reason @ FallbackReason::OAuthQuota { backend }) => {
703            let alt = match backend {
704                "codex" => Some(crate::cli::LlmBackendChoice::Claude),
705                "claude" => Some(crate::cli::LlmBackendChoice::Codex),
706                "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
707                "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
708                _ => None,
709            };
710            if let Some(alt_choice) = alt {
711                try_embed_query_with_choice(models_dir, query, Some(alt_choice))
712            } else {
713                Err(reason)
714            }
715        }
716        Err(reason @ FallbackReason::SlotExhausted) => {
717            std::thread::sleep(std::time::Duration::from_millis(750));
718            try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
719        }
720        Err(other) => Err(other),
721    }
722}
723
724/// Classify an embedding [`AppError`] into a typed [`FallbackReason`].
725///
726/// v1.0.85 (ADR-0043): discriminates the 4 new causes (SlotExhausted,
727/// OAuthQuota, BackendMismatch, DimZero) from the legacy generic
728/// EmbeddingFailed bucket. The classification is purely lexical
729/// (substring match on the message) — no I/O, no retries, no
730/// telemetry, deterministic and `#[serial_test::serial(env)]`-safe.
731pub fn classify_embedding_error(err: AppError) -> FallbackReason {
732    match err {
733        AppError::Timeout {
734            operation,
735            duration_secs,
736        } => FallbackReason::Timeout {
737            operation,
738            duration_secs,
739        },
740        AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
741            // GAP-004 (v1.0.88): typed-discriminator dispatch.
742            // The lexical classifier picks the discriminator; the arms below
743            // enrich the result with the backend name and the
744            // requested/resolved pair that the JSON envelope needs.
745            //
746            // Note: `Cancelled` and `EmbeddingFailed(msg)` are not in the
747            // 6-variant enum (they have no lexical marker) so we keep them
748            // as explicit guards at the head of the match.
749            EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
750            EmbeddingErrorKind::OAuth => {
751                let backend = if msg.contains("codex") {
752                    "codex"
753                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
754                    // G45-CR5: anthropic-ratelimit-* headers are emitted only by
755                    // the Claude CLI subprocess; treat them as claude quota
756                    // signals even when the message text omits the word
757                    // "claude" explicitly.
758                    "claude"
759                } else if msg.contains("opencode") {
760                    "opencode"
761                } else {
762                    "unknown"
763                };
764                FallbackReason::OAuthQuota { backend }
765            }
766            EmbeddingErrorKind::Quota => {
767                let backend = if msg.contains("codex") {
768                    "codex"
769                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
770                    "claude"
771                } else if msg.contains("opencode") {
772                    "opencode"
773                } else {
774                    "unknown"
775                };
776                FallbackReason::OAuthQuota { backend }
777            }
778            EmbeddingErrorKind::BackendMismatch => {
779                // The `msg.contains("claude")` arm is intentionally
780                // placed BEFORE the OAuth arm so that a backend-mismatch
781                // message that mentions both "claude" and "codex" maps to
782                // BackendMismatch (the more specific failure mode).
783                let (requested, resolved) =
784                    if msg.contains("requested claude") && msg.contains("but codex") {
785                        ("claude", "codex")
786                    } else if msg.contains("requested codex") && msg.contains("but claude") {
787                        ("codex", "claude")
788                    } else if msg.contains("requested claude") {
789                        ("claude", "unknown")
790                    } else if msg.contains("requested codex") {
791                        ("codex", "unknown")
792                    } else {
793                        ("unknown", "unknown")
794                    };
795                FallbackReason::BackendMismatch {
796                    requested,
797                    resolved,
798                }
799            }
800            EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
801            EmbeddingErrorKind::Unknown => {
802                if msg.contains("cancelled") {
803                    FallbackReason::Cancelled
804                } else {
805                    FallbackReason::EmbeddingFailed(msg)
806                }
807            }
808        },
809        e => FallbackReason::EmbeddingFailed(e.to_string()),
810    }
811}
812// backends before giving up. The chain order matches the user-supplied
813// `--llm-fallback` list (default: codex, claude, none).
814// =============================================================================
815
816/// Tries each LLM backend in `chain` in order, returning the first
817/// successful embedding. On failure, the diagnostic tail of the last
818/// error is preserved in the returned `AppError::Embedding` so the
819/// operator can see WHY every backend failed.
820///
821/// If `skip_on_failure` is `true` AND every backend fails, the function
822/// returns `Ok(Vec::new())` (an empty vector) to signal "persist
823/// without embedding" — the call site is then responsible for writing
824/// a `pending_embeddings` row that can be retried later by the
825/// `embedding retry` subcommand.
826///
827/// Defaults the chain to `[codex, claude, none]` when `chain` is
828/// empty, matching the v1.0.81 behaviour where codex was the
829/// implicit default and claude was the implicit fallback.
830pub fn embed_with_fallback(
831    models_dir: &Path,
832    text: &str,
833    chain: &[LlmBackendKind],
834    skip_on_failure: bool,
835) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
836    use crate::llm::exit_code_hints::LlmBackendError;
837    let effective: Vec<LlmBackendKind> = if chain.is_empty() {
838        vec![
839            LlmBackendKind::Codex,
840            LlmBackendKind::Claude,
841            LlmBackendKind::Opencode,
842            LlmBackendKind::None,
843        ]
844    } else {
845        chain.to_vec()
846    };
847
848    let mut last_err: Option<AppError> = None;
849    for backend in &effective {
850        // BUG-003 / v1.0.85: propagar o backend REAL retornado por
851        // embed_via_backend (que pode diferir do chain position quando
852        // LlmEmbedding::detect_available substitui codex por claude).
853        // O tuple `(_, requested_kind)` é descartado — só queremos o
854        // backend resolvido na primeira posição.
855        // ADR-0046 / BUG-11 v1.0.88: use `embed_via_backend_strict` so the
856        // sentinel `None` backend propagates the last real error instead
857        // of silently degrading to `Ok((Vec::new(), None))`. This is the
858        // path that caused preflight rejections to be swallowed by the
859        // chain's default trailing `None`.
860        match embed_via_backend_strict(
861            models_dir,
862            text,
863            backend,
864            last_err.as_ref(),
865            skip_on_failure,
866        ) {
867            Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
868            Err(e) => {
869                // ADR-0011: Validation errors (OAuth-only enforcement) are
870                // FATAL — propagate immediately without trying the next
871                // backend. This prevents the fallback chain from swallowing
872                // OAuth violations via the trailing `None` sentinel.
873                if matches!(e, AppError::Validation(_)) {
874                    return Err(e);
875                }
876                tracing::warn!(
877                    target: "embedding",
878                    backend = ?backend,
879                    error = %e,
880                    "embed_with_fallback: backend failed, trying next"
881                );
882                last_err = Some(e);
883            }
884        }
885    }
886    if skip_on_failure {
887        // Signal "persist with no embedding" via an empty vector paired
888        // with `None` so callers know the chain exhausted without a hit.
889        // Caller is responsible for writing a `pending_embeddings` row
890        // that can be retried later by the `embedding retry` subcommand.
891        return Ok((Vec::new(), LlmBackendKind::None));
892    }
893    Err(last_err
894        .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
895}
896
897/// LLM backend kind for the fallback chain. Mirrors the CLI
898/// `--llm-backend` enum so users can pass the same value to
899/// `--llm-fallback` without translation.
900#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
901pub enum LlmBackendKind {
902    /// `codex exec` (default for v1.0.76+).
903    Codex,
904    /// `claude -p` (fallback for ChatGPT Pro OAuth unavailability).
905    Claude,
906    /// `opencode run` (v1.0.90).
907    Opencode,
908    /// OpenRouter HTTP API (v1.0.93).
909    OpenRouter,
910    /// No embedding — empty vector returned.
911    None,
912}
913
914impl LlmBackendKind {
915    /// Stable string label used in tracing and JSON envelopes. The
916    /// string values are part of the public contract for `envelope.backend_invoked`.
917    pub fn as_str(self) -> &'static str {
918        match self {
919            Self::Codex => "codex",
920            Self::Claude => "claude",
921            Self::Opencode => "opencode",
922            Self::OpenRouter => "openrouter",
923            Self::None => "none",
924        }
925    }
926}
927
928/// Embeds a single text via the given backend. Used by
929/// `embed_with_fallback` and exposed to allow direct one-shot
930/// selection without a chain.
931/// Embeds a single text via the given backend. Used by
932/// `embed_with_fallback` and exposed to allow direct one-shot
933/// selection without a chain.
934///
935/// BUG-003 / v1.0.85: returns `(Vec<f32>, LlmBackendKind)`. The
936/// second element reports the backend that ACTUALLY executed the
937/// embedding, not the chain position requested by the caller. When
938/// `LlmBackendKind::Codex` is requested but `codex` is absent from
939/// PATH, `LlmEmbedding::detect_available` substitutes claude and the
940/// tuple carries `LlmBackendKind::Claude` so the operator sees the
941/// truth in `envelope.backend_invoked`.
942pub fn embed_via_backend(
943    models_dir: &Path,
944    text: &str,
945    backend: &LlmBackendKind,
946) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
947    match backend {
948        LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
949        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
950        LlmBackendKind::Claude => {
951            // ADR-0042 / GAP-002: route Claude through its own static
952            // embedder instead of re-using the Codex path (which used
953            // to silently pick Codex if PATH ordered it first).
954            tracing::debug!(
955                target: "embedder",
956                backend = "claude",
957                "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
958            );
959            embed_via_claude_local_resolved(models_dir, text, None, None)
960        }
961        LlmBackendKind::Opencode => {
962            tracing::debug!(
963                target: "embedder",
964                backend = "opencode",
965                "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
966            );
967            embed_via_opencode_local_resolved(models_dir, text, None, None)
968        }
969        LlmBackendKind::OpenRouter => {
970            tracing::debug!(
971                target: "embedder",
972                backend = "openrouter",
973                "embed_via_backend: using OpenRouter API (v1.0.93)"
974            );
975            let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
976                AppError::Embedding(
977                    "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
978                )
979            })?;
980            let rt = shared_runtime()?;
981            let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
982            Ok((vec, LlmBackendKind::OpenRouter))
983        }
984    }
985}
986
987// ADR-0046 / BUG-11 v1.0.88: specialisation of `embed_via_backend` that
988// refuses to SILENTLY DEGRADE to `LlmBackendKind::None` after all real
989// backends (Codex, Claude) have failed. The previous behaviour
990// (`Ok((Vec::new(), None))`) caused the `remember` write path to persist
991// memories with zero-dimensional embeddings — breaking `recall` and
992// `hybrid-search` while returning exit 0 (BUG-11 CRITICAL).
993//
994// When `--llm-backend none` is explicitly requested (i.e. `last_err` is
995// None AND the chain was a single-element `[None]`), pass
996// `skip_on_failure = true` to `embed_with_fallback` to consume the empty
997// vector via the pending-embeddings retry queue instead of persisting
998// directly. This helper is the right hook for `remember`/`edit`/`ingest`.
999pub fn embed_via_backend_strict(
1000    models_dir: &Path,
1001    text: &str,
1002    backend: &LlmBackendKind,
1003    last_err: Option<&AppError>,
1004    skip_on_failure: bool,
1005) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1006    use crate::llm::exit_code_hints::LlmBackendError;
1007    match backend {
1008        LlmBackendKind::None => {
1009            // If the caller opted into skip_on_failure AND no prior
1010            // backend has recorded an error, the empty vector is
1011            // intentional (chain of only [None]).
1012            if skip_on_failure && last_err.is_none() {
1013                Ok((Vec::new(), LlmBackendKind::None))
1014            } else if last_err.is_some() {
1015                // The chain reached `None` after Codex/Claude failed.
1016                // Propagate the most recent error so `remember` aborts
1017                // instead of persisting a memory without an embedding.
1018                Err(match last_err {
1019                    Some(e) => AppError::Embedding(format!("{e}")),
1020                    None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1021                })
1022            } else {
1023                // Empty chain with no skip_on_failure — treat as a
1024                // configuration error (no backends available).
1025                Err(AppError::Embedding(
1026                    LlmBackendError::NoBackendsAvailable.to_string(),
1027                ))
1028            }
1029        }
1030        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1031        LlmBackendKind::Claude => {
1032            tracing::debug!(
1033                target: "embedder",
1034                backend = "claude",
1035                "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1036            );
1037            embed_via_claude_local_resolved(models_dir, text, None, None)
1038        }
1039        LlmBackendKind::Opencode => {
1040            tracing::debug!(
1041                target: "embedder",
1042                backend = "opencode",
1043                "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1044            );
1045            embed_via_opencode_local_resolved(models_dir, text, None, None)
1046        }
1047        LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1048    }
1049}
1050
1051/// Legacy one-shot wrapper around `embed_via_backend` that discards
1052/// the resolved backend. Kept for call sites that only care about
1053/// the vector and ignore the executed-backend signal. New code
1054/// should prefer `embed_via_backend` directly.
1055pub fn embed_via_backend_legacy(
1056    models_dir: &Path,
1057    text: &str,
1058    backend: &LlmBackendKind,
1059) -> Result<Vec<f32>, AppError> {
1060    embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1061}
1062
1063pub fn embed_passages_controlled_local(
1064    models_dir: &Path,
1065    texts: &[&str],
1066    token_counts: &[usize],
1067) -> Result<Vec<Vec<f32>>, AppError> {
1068    let embedder = get_embedder(models_dir)?;
1069    embed_passages_controlled(embedder, texts, token_counts)
1070}
1071
1072/// G42/S3: embeds `texts` through the bounded parallel fan-out and
1073/// returns vectors in input order.
1074pub fn embed_passages_parallel_local(
1075    models_dir: &Path,
1076    texts: &[String],
1077    parallelism: usize,
1078    batch_size: usize,
1079) -> Result<Vec<Vec<f32>>, AppError> {
1080    let embedder = get_embedder(models_dir)?;
1081    embed_texts_parallel(embedder, texts, parallelism, batch_size)
1082}
1083
1084/// v1.0.93 (GAP-OR-INGEST): embeds multiple passages with
1085/// `EmbeddingBackendChoice` awareness. When the resolved chain starts
1086/// with `OpenRouter` and the client is initialised, uses the HTTP batch
1087/// API (`embed_batch`) instead of subprocess fan-out — no LLM slot
1088/// consumed, ~200ms per batch vs ~15s per subprocess cold-start.
1089/// Falls back to `embed_passages_parallel_local` for LLM backends.
1090pub fn embed_passages_parallel_with_embedding_choice(
1091    models_dir: &Path,
1092    texts: &[String],
1093    parallelism: usize,
1094    batch_size: usize,
1095    embedding_backend: crate::cli::EmbeddingBackendChoice,
1096    llm_backend: crate::cli::LlmBackendChoice,
1097) -> Result<Vec<Vec<f32>>, AppError> {
1098    let chain = embedding_backend.to_chain(llm_backend);
1099    if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1100        let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1101            AppError::Embedding(
1102                "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1103            )
1104        })?;
1105        let rt = shared_runtime()?;
1106        let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1107        let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1108        Ok(vecs)
1109    } else {
1110        embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1111    }
1112}
1113
1114/// G56: in-process cache for entity embeddings keyed by `(model, text)`.
1115///
1116/// Schema v13 is immutable: `entity_embeddings` does not have a `text`
1117/// column, so a pure DB-side cache would require a schema bump. Instead
1118/// we keep a process-wide LRU-style map that survives within one CLI
1119/// invocation. The hit rate is high in `ingest` (re-embedding the same
1120/// canonical entity across thousands of memories) and modest in `remember`
1121/// (typical single-memory invocations).
1122///
1123/// Key: `blake3(model || "\0" || text)`. Value: `Arc<Vec<f32>>` so the
1124/// collector can drop the map entry while a `Vec` is still in flight.
1125type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1126
1127static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1128
1129fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1130    ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1131}
1132
1133fn entity_cache_key(model: &str, text: &str) -> u64 {
1134    let mut hasher = blake3::Hasher::new();
1135    hasher.update(model.as_bytes());
1136    hasher.update(b"\0");
1137    hasher.update(text.as_bytes());
1138    let h = hasher.finalize();
1139    let bytes = h.as_bytes();
1140    u64::from_le_bytes([
1141        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1142    ])
1143}
1144
1145/// G56: embeds entity-name texts through a process-wide cache.
1146///
1147/// Skips any `(model, text)` pair already produced in this CLI invocation
1148/// and only spawns subprocesses for the cache misses. Returns vectors in
1149/// the same order as `texts`.
1150///
1151/// Designed for entity-name batches (short texts). For chunk embeds use
1152/// [`embed_passages_parallel_local`] directly — chunks are unique per
1153/// memory and cache hit rate is negligible.
1154pub fn embed_entity_texts_cached(
1155    models_dir: &Path,
1156    texts: &[String],
1157    parallelism: usize,
1158    embedding_backend: crate::cli::EmbeddingBackendChoice,
1159    llm_backend: crate::cli::LlmBackendChoice,
1160) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1161    if texts.is_empty() {
1162        return Ok((Vec::new(), EmbedCacheStats::default()));
1163    }
1164    // GAP-OR-ENTITY-EMBED: resolve the SAME chain the chunk path uses so the
1165    // entity embedding honours `--embedding-backend`/`--llm-backend` instead
1166    // of always forcing the codex subprocess (the old G56 code path).
1167    let chain = embedding_backend.to_chain(llm_backend);
1168
1169    // `none` short-circuit: when the resolved chain is exactly `[None]`
1170    // (`--embedding-backend llm --llm-backend none`) skip every backend and
1171    // return empty vectors WITHOUT spawning a subprocess. Empties are never
1172    // cached so a later call with a real backend in the same process is not
1173    // poisoned; they count as misses for stats parity with the chunk path.
1174    if chain.as_slice() == [LlmBackendKind::None] {
1175        let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1176        return Ok((
1177            out,
1178            EmbedCacheStats {
1179                requested: texts.len(),
1180                hits: 0,
1181                misses: texts.len(),
1182            },
1183        ));
1184    }
1185
1186    // Cache model label reflects the EFFECTIVE embedding backend. When the
1187    // chain actually routes through OpenRouter, vectors carry that model's
1188    // dim/MRL profile and must never collide with codex-produced vectors;
1189    // for the local path we keep the prior `model_label()` so the in-process
1190    // cache key is unchanged (no regression — this cache is process-local).
1191    let routed_openrouter =
1192        chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1193    let model = if routed_openrouter {
1194        format!("openrouter:{}", crate::constants::embedding_dim())
1195    } else {
1196        get_embedder(models_dir)?.lock().model_label()
1197    };
1198    let cache = entity_embed_cache();
1199    let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1200    let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1201    {
1202        let guard = cache.lock();
1203        for (i, text) in texts.iter().enumerate() {
1204            let key = entity_cache_key(&model, text);
1205            if let Some(v) = guard.get(&key) {
1206                hits[i] = Some(Arc::clone(v));
1207            } else {
1208                miss_indices.push(i);
1209            }
1210        }
1211    }
1212    let miss_count = miss_indices.len();
1213    if miss_count > 0 {
1214        let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1215        // GAP-OR-ENTITY-EMBED: route misses through the backend-aware batch
1216        // helper (same one the chunk path uses). With OpenRouter this hits the
1217        // REST `embed_batch` (~200ms) instead of the codex subprocess (~120s).
1218        let miss_vecs = embed_passages_parallel_with_embedding_choice(
1219            models_dir,
1220            &miss_texts,
1221            parallelism,
1222            entity_embed_batch_size(),
1223            embedding_backend,
1224            llm_backend,
1225        )?;
1226        let mut guard = cache.lock();
1227        for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1228            let vec = Arc::new(miss_vecs[slot].clone());
1229            let key = entity_cache_key(&model, &texts[orig_idx]);
1230            guard.insert(key, Arc::clone(&vec));
1231            hits[orig_idx] = Some(vec);
1232        }
1233    }
1234    let mut out = Vec::with_capacity(texts.len());
1235    for hit in hits.into_iter() {
1236        let v = hit.ok_or_else(|| {
1237            AppError::Embedding("entity embed cache produced null result".to_string())
1238        })?;
1239        out.push((*v).clone());
1240    }
1241    Ok((
1242        out,
1243        EmbedCacheStats {
1244            requested: texts.len(),
1245            hits: texts.len() - miss_count,
1246            misses: miss_count,
1247        },
1248    ))
1249}
1250
1251/// G56: stats snapshot returned by [`embed_entity_texts_cached`].
1252#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1253pub struct EmbedCacheStats {
1254    pub requested: usize,
1255    pub hits: usize,
1256    pub misses: usize,
1257}
1258
1259impl EmbedCacheStats {
1260    /// Hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 when nothing was requested.
1261    pub fn hit_rate(&self) -> f64 {
1262        if self.requested == 0 {
1263            0.0
1264        } else {
1265            self.hits as f64 / self.requested as f64
1266        }
1267    }
1268}
1269
1270/// G42/S3 core: bounded parallel batch embedding.
1271///
1272/// - texts are grouped into batches of `batch_size` (one LLM call per
1273///   batch, G42/S2);
1274/// - at most `effective_permits(parallelism)` LLM subprocesses run
1275///   simultaneously (`Arc<Semaphore>` + `acquire_owned`, BLOCO 2);
1276/// - results stream through a BOUNDED mpsc channel so the caller-side
1277///   collector applies backpressure and can persist incrementally
1278///   (BLOCO 5);
1279/// - the global `CancellationToken` aborts in-flight work on the first
1280///   signal; subprocesses die with their futures via `kill_on_drop`
1281///   (BLOCO 6).
1282pub fn embed_texts_parallel(
1283    embedder: &Mutex<LlmEmbedding>,
1284    texts: &[String],
1285    parallelism: usize,
1286    batch_size: usize,
1287) -> Result<Vec<Vec<f32>>, AppError> {
1288    let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1289    embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1290        slots[idx] = Some(v.to_vec());
1291        Ok(())
1292    })?;
1293    let mut out = Vec::with_capacity(slots.len());
1294    for (idx, slot) in slots.into_iter().enumerate() {
1295        out.push(slot.ok_or_else(|| {
1296            AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1297        })?);
1298    }
1299    Ok(out)
1300}
1301
1302/// Like [`embed_texts_parallel`] but invokes `on_result` as soon as each
1303/// embedding arrives (BLOCO 5: incremental persistence — a kill loses at
1304/// most the in-flight batches, never the already-delivered items).
1305pub fn embed_texts_parallel_with(
1306    embedder: &Mutex<LlmEmbedding>,
1307    texts: &[String],
1308    parallelism: usize,
1309    batch_size: usize,
1310    mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1311) -> Result<(), AppError> {
1312    if texts.is_empty() {
1313        return Ok(());
1314    }
1315    let dim = crate::constants::embedding_dim();
1316    if texts.len() == 1 {
1317        let v = embed_passage(embedder, &texts[0])?;
1318        return on_result(0, &v);
1319    }
1320
1321    let client = clone_client(embedder);
1322    let permits = effective_permits(parallelism);
1323    let batches = build_batches(texts, batch_size.max(1));
1324    let token = crate::cancel_token().clone();
1325
1326    let work = move |batch: Vec<(usize, String)>| {
1327        let client = client.clone();
1328        async move {
1329            client
1330                .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1331                .await
1332        }
1333    };
1334
1335    let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1336    match tokio::runtime::Handle::try_current() {
1337        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1338        Err(_) => shared_runtime()?.block_on(fan_out),
1339    }
1340}
1341
1342/// Groups `(global_index, text)` pairs into batches of `batch_size`.
1343fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1344    texts
1345        .iter()
1346        .cloned()
1347        .enumerate()
1348        .collect::<Vec<_>>()
1349        .chunks(batch_size)
1350        .map(|c| c.to_vec())
1351        .collect()
1352}
1353
1354/// G42/S3 BLOCO 2: effective permit count.
1355///
1356/// `permits = clamp(requested, 1, 32) ∧ cpus ∧ ram_livre*0.5/RSS` — see
1357/// the module docs for the measured RSS rationale.
1358pub fn effective_permits(requested: usize) -> usize {
1359    let cpus = std::thread::available_parallelism()
1360        .map(|n| n.get())
1361        .unwrap_or(4);
1362    let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1363        / crate::constants::LLM_WORKER_RSS_MB)
1364        .max(1) as usize;
1365    requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1366}
1367
1368/// Bounded fan-out engine. Generic over the per-batch work so the
1369/// concurrency contract is testable without spawning real LLMs.
1370///
1371/// Cancel safety (BLOCO 6/10): every task races its work against
1372/// `token.cancelled()` inside `tokio::select!`; both branches are
1373/// cancel-safe (the work future owns its subprocess via `kill_on_drop`,
1374/// and `cancelled()` is pure). On collector-side errors the `JoinSet`
1375/// is shut down, which drops in-flight futures and kills their
1376/// subprocesses.
1377async fn run_bounded<F, Fut>(
1378    batches: Vec<Vec<(usize, String)>>,
1379    permits: usize,
1380    dim: usize,
1381    token: CancellationToken,
1382    work: F,
1383    on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1384) -> Result<(), AppError>
1385where
1386    F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1387    Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1388{
1389    let total_batches = batches.len();
1390    let semaphore = Arc::new(Semaphore::new(permits));
1391    // BLOCO 5: bounded channel — producers block when the collector is
1392    // behind (backpressure); PROIBIDO unbounded_channel between stages.
1393    let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1394    let mut set: JoinSet<()> = JoinSet::new();
1395
1396    for (batch_idx, batch) in batches.into_iter().enumerate() {
1397        let sem = Arc::clone(&semaphore);
1398        let token = token.clone();
1399        let tx = tx.clone();
1400        let work = work.clone();
1401        set.spawn(async move {
1402            let wait_start = std::time::Instant::now();
1403            // acquire_owned: RAII permit moved into the task; returned
1404            // on every exit path INCLUDING panic (BLOCO 2).
1405            let Ok(_permit) = sem.acquire_owned().await else {
1406                let _ = tx
1407                    .send(Err(AppError::Embedding("semaphore closed".to_string())))
1408                    .await;
1409                return;
1410            };
1411            let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1412            let work_start = std::time::Instant::now();
1413            // ADR-0034: when `SQLITE_GRAPHRAG_IGNORE_SHUTDOWN=1` is set the
1414            // cancellation arm is dropped and the batch runs to completion.
1415            // This unblocks audit/test invocations whose `SHUTDOWN` flag was
1416            // contaminated by an earlier signal handler in the same process
1417            // tree. Production code never sees this branch.
1418            let outcome = if crate::should_obey_shutdown() {
1419                tokio::select! {
1420                    res = work(batch) => res,
1421                    _ = token.cancelled() => Err(AppError::Embedding(
1422                        "embedding cancelled by shutdown signal".to_string(),
1423                    )),
1424                }
1425            } else {
1426                work(batch).await
1427            };
1428            // BLOCO 8: permit wait time logged SEPARATELY from work time.
1429            tracing::debug!(
1430                target: "embedding",
1431                batch_idx,
1432                permit_wait_ms,
1433                work_ms = work_start.elapsed().as_millis() as u64,
1434                ok = outcome.is_ok(),
1435                "embedding batch finished"
1436            );
1437            let _ = tx.send(outcome).await;
1438        });
1439    }
1440    drop(tx);
1441
1442    let mut completed = 0usize;
1443    let mut failed = 0usize;
1444    let mut cancelled = 0usize;
1445    let mut first_error: Option<AppError> = None;
1446
1447    while let Some(message) = rx.recv().await {
1448        match message {
1449            Ok(items) => {
1450                completed += 1;
1451                if first_error.is_none() {
1452                    for (idx, v) in items {
1453                        if v.len() != dim {
1454                            first_error = Some(AppError::Embedding(format!(
1455                                "LLM returned {} dims for item {idx}, expected {dim}; \
1456                                 refusing to truncate or pad silently (G42/C5)",
1457                                v.len()
1458                            )));
1459                            break;
1460                        }
1461                        if let Err(e) = on_result(idx, &v) {
1462                            first_error = Some(e);
1463                            break;
1464                        }
1465                    }
1466                    if first_error.is_some() {
1467                        // Abort remaining work: dropped futures kill
1468                        // their subprocesses via kill_on_drop (BLOCO 6).
1469                        set.shutdown().await;
1470                    }
1471                }
1472            }
1473            Err(e) => {
1474                if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1475                    cancelled += 1;
1476                } else {
1477                    failed += 1;
1478                }
1479                if first_error.is_none() {
1480                    first_error = Some(e);
1481                    set.shutdown().await;
1482                }
1483            }
1484        }
1485    }
1486
1487    // Drain the JoinSet: surface panics distinctly (panic handling —
1488    // JoinError::is_panic tratado em todo join_next, BLOCO 9).
1489    while let Some(join_result) = set.join_next().await {
1490        if let Err(join_err) = join_result {
1491            if join_err.is_panic() {
1492                failed += 1;
1493                if first_error.is_none() {
1494                    first_error = Some(AppError::Embedding(format!(
1495                        "embedding task panicked: {join_err}"
1496                    )));
1497                }
1498            } else {
1499                cancelled += 1;
1500            }
1501        }
1502    }
1503
1504    // v1.0.85 (ADR-0043 hygiene): the fan-out summary event moved
1505    // from `tracing::info!` to `tracing::debug!` and the
1506    // `available_permits` field was removed — the user prohibited
1507    // pool-state telemetry (slot_pool_stats / slot_wait_ms) and
1508    // decorative `tracing::info!` events. The remaining counters
1509    // (total_batches / completed / failed / cancelled) describe the
1510    // progress of the operation itself, not the slot pool, and
1511    // remain visible to operators running with `RUST_LOG=debug` or
1512    // `-vvv`.
1513    tracing::debug!(
1514        target: "embedding",
1515        total_batches,
1516        completed,
1517        failed,
1518        cancelled,
1519        "embedding fan-out finished"
1520    );
1521
1522    match first_error {
1523        Some(e) => Err(e),
1524        None => Ok(()),
1525    }
1526}
1527
1528pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1529    let mut out = Vec::with_capacity(v.len() * 4);
1530    for f in v {
1531        out.extend_from_slice(&f.to_le_bytes());
1532    }
1533    out
1534}
1535
1536pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1537    let mut out = Vec::with_capacity(bytes.len() / 4);
1538    for chunk in bytes.chunks_exact(4) {
1539        out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1540    }
1541    out
1542}
1543
1544/// Returns the dimensionality of the embedding space. Used to
1545/// validate LLM responses and to size the in-memory cache.
1546pub fn embedding_dim() -> usize {
1547    crate::constants::embedding_dim()
1548}
1549
1550/// G42/C5: a vector with a divergent dimensionality is an ERROR, never
1551/// silently truncated or zero-padded (the pre-v1.0.79 `normalise_dim`
1552/// masked malformed LLM responses).
1553fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1554    let dim = crate::constants::embedding_dim();
1555    if v.len() != dim {
1556        return Err(AppError::Embedding(format!(
1557            "embedding has {} dims, expected {dim}; \
1558             refusing to truncate or pad silently (G42/C5)",
1559            v.len()
1560        )));
1561    }
1562    Ok(v)
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567    use super::*;
1568    use std::sync::atomic::{AtomicUsize, Ordering};
1569
1570    #[test]
1571    fn f32_to_bytes_roundtrip() {
1572        let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1573        let bytes = f32_to_bytes(&input);
1574        assert_eq!(bytes.len(), input.len() * 4);
1575        let out = bytes_to_f32(&bytes);
1576        assert_eq!(out, input);
1577    }
1578
1579    #[test]
1580    fn validate_dim_rejects_divergent_vectors() {
1581        // G42/C5 acceptance criterion: a divergent vector MUST fail —
1582        // never be silently normalised.
1583        let dim = crate::constants::embedding_dim();
1584        let long = vec![0.0; dim + 10];
1585        assert!(validate_dim(long).is_err(), "longer vector must error");
1586        let short = vec![0.0; dim.saturating_sub(1).max(1)];
1587        assert!(validate_dim(short).is_err(), "shorter vector must error");
1588        let exact = vec![0.0; dim];
1589        assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1590    }
1591
1592    #[test]
1593    fn embedding_dim_matches_constants_source() {
1594        assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1595    }
1596
1597    #[test]
1598    fn build_batches_preserves_global_indices() {
1599        let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1600        let batches = build_batches(&texts, 4);
1601        assert_eq!(batches.len(), 3);
1602        assert_eq!(batches[0].len(), 4);
1603        assert_eq!(batches[2].len(), 2);
1604        assert_eq!(batches[2][1].0, 9);
1605        assert_eq!(batches[2][1].1, "t9");
1606    }
1607
1608    #[test]
1609    fn effective_permits_clamps_to_bounds() {
1610        assert!(effective_permits(0) >= 1);
1611        assert!(effective_permits(1000) <= 32);
1612    }
1613
1614    fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1615        (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1616    }
1617
1618    fn dummy_vec(dim: usize) -> Vec<f32> {
1619        vec![0.0; dim]
1620    }
1621
1622    /// G42 acceptance criterion: with N permits the measured peak of
1623    /// concurrent workers NEVER exceeds N, even with 10x more batches.
1624    #[test]
1625    fn concurrency_peak_never_exceeds_permits() {
1626        let permits = 4usize;
1627        let batches = test_batches(permits * 10);
1628        let dim = crate::constants::embedding_dim();
1629        let current = Arc::new(AtomicUsize::new(0));
1630        let peak = Arc::new(AtomicUsize::new(0));
1631
1632        let current_c = Arc::clone(&current);
1633        let peak_c = Arc::clone(&peak);
1634        let work = move |batch: Vec<(usize, String)>| {
1635            let current = Arc::clone(&current_c);
1636            let peak = Arc::clone(&peak_c);
1637            async move {
1638                let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1639                peak.fetch_max(now, Ordering::SeqCst);
1640                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1641                current.fetch_sub(1, Ordering::SeqCst);
1642                Ok(batch
1643                    .into_iter()
1644                    .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1645                    .collect())
1646            }
1647        };
1648
1649        let mut delivered = 0usize;
1650        let rt = tokio::runtime::Builder::new_multi_thread()
1651            .worker_threads(4)
1652            .enable_all()
1653            .build()
1654            .expect("test runtime");
1655        rt.block_on(run_bounded(
1656            batches,
1657            permits,
1658            dim,
1659            CancellationToken::new(),
1660            work,
1661            &mut |_idx, _v| {
1662                delivered += 1;
1663                Ok(())
1664            },
1665        ))
1666        .expect("fan-out must succeed");
1667
1668        assert_eq!(delivered, permits * 10, "every item must be delivered");
1669        assert!(
1670            peak.load(Ordering::SeqCst) <= permits,
1671            "peak concurrency {} exceeded permits {permits}",
1672            peak.load(Ordering::SeqCst)
1673        );
1674    }
1675
1676    /// G42 acceptance criterion: a panicking task returns its permit via
1677    /// RAII and surfaces as JoinError::is_panic, not a hang.
1678    #[test]
1679    fn panicking_task_returns_permit_and_surfaces_error() {
1680        let permits = 2usize;
1681        let batches = test_batches(4);
1682        let dim = crate::constants::embedding_dim();
1683
1684        let work = move |batch: Vec<(usize, String)>| async move {
1685            if batch[0].0 == 1 {
1686                panic!("intentional test panic");
1687            }
1688            Ok(batch
1689                .into_iter()
1690                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1691                .collect())
1692        };
1693
1694        let rt = tokio::runtime::Builder::new_multi_thread()
1695            .worker_threads(2)
1696            .enable_all()
1697            .build()
1698            .expect("test runtime");
1699        let result = rt.block_on(run_bounded(
1700            batches,
1701            permits,
1702            dim,
1703            CancellationToken::new(),
1704            work,
1705            &mut |_idx, _v| Ok(()),
1706        ));
1707
1708        let err = result.expect_err("panic must surface as an error");
1709        assert!(
1710            err.to_string().contains("panicked"),
1711            "error must mention the panic: {err}"
1712        );
1713    }
1714
1715    /// G42 acceptance criterion: cancellation aborts in-flight work and
1716    /// the fan-out terminates within the shutdown timeout.
1717    #[test]
1718    fn cancellation_terminates_fan_out_quickly() {
1719        let permits = 2usize;
1720        let batches = test_batches(8);
1721        let dim = crate::constants::embedding_dim();
1722        let token = CancellationToken::new();
1723
1724        let work = move |batch: Vec<(usize, String)>| async move {
1725            // Long enough that only cancellation can finish the test fast.
1726            tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1727            Ok(batch
1728                .into_iter()
1729                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1730                .collect())
1731        };
1732
1733        let rt = tokio::runtime::Builder::new_multi_thread()
1734            .worker_threads(2)
1735            .enable_all()
1736            .build()
1737            .expect("test runtime");
1738        let cancel = token.clone();
1739        let start = std::time::Instant::now();
1740        let result = rt.block_on(async move {
1741            tokio::spawn(async move {
1742                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1743                cancel.cancel();
1744            });
1745            run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1746        });
1747
1748        assert!(result.is_err(), "cancelled fan-out must report an error");
1749        assert!(
1750            start.elapsed() < std::time::Duration::from_secs(10),
1751            "graceful shutdown must finish well under the work duration"
1752        );
1753    }
1754
1755    /// G42 acceptance criterion: a divergent dim coming out of the work
1756    /// stage fails the fan-out instead of being silently accepted.
1757    #[test]
1758    fn fan_out_rejects_divergent_dim() {
1759        let permits = 2usize;
1760        let batches = test_batches(2);
1761        let dim = crate::constants::embedding_dim();
1762
1763        let work = move |batch: Vec<(usize, String)>| async move {
1764            Ok(batch
1765                .into_iter()
1766                .map(|(i, _)| (i, vec![0.0f32; 3]))
1767                .collect::<Vec<(usize, Vec<f32>)>>())
1768        };
1769
1770        let rt = tokio::runtime::Builder::new_multi_thread()
1771            .worker_threads(2)
1772            .enable_all()
1773            .build()
1774            .expect("test runtime");
1775        let result = rt.block_on(run_bounded(
1776            batches,
1777            permits,
1778            dim,
1779            CancellationToken::new(),
1780            work,
1781            &mut |_idx, _v| Ok(()),
1782        ));
1783
1784        let err = result.expect_err("divergent dim must fail the fan-out");
1785        assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1786    }
1787
1788    /// G44: the calibration bases stay intact at the calibration dim.
1789    #[test]
1790    fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1791        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1792        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1793    }
1794
1795    /// G44: legacy 384-dim databases shrink to reliable batch sizes.
1796    #[test]
1797    fn adaptive_batch_dim384_shrinks() {
1798        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1799        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1800    }
1801
1802    /// G44: intermediate dims scale proportionally to the float budget.
1803    #[test]
1804    fn adaptive_batch_intermediate_dims() {
1805        assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1806        assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1807    }
1808
1809    /// G44: dims below the calibration dim never exceed the base.
1810    #[test]
1811    fn adaptive_batch_small_dim_clamps_to_base() {
1812        assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1813    }
1814
1815    /// G44: the function is total — no division by zero, no clamp panic.
1816    #[test]
1817    fn adaptive_batch_total_function() {
1818        assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1819        assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1820        assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1821    }
1822
1823    /// G44 end-to-end: the public wrappers follow the env-dim override.
1824    #[test]
1825    #[serial_test::serial(env)]
1826    fn adaptive_wrappers_follow_env_dim() {
1827        std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1828        let chunk = chunk_embed_batch_size();
1829        let entity = entity_embed_batch_size();
1830        std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1831        crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1832        assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1833        assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1834    }
1835
1836    // ---------------------------------------------------------------
1837    // G58/S1: FallbackReason + try_embed_query_with_fallback tests
1838    // ---------------------------------------------------------------
1839
1840    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps an OAuth
1841    /// error message to the OAuth variant regardless of case or
1842    /// surrounding text.
1843    #[test]
1844    fn embedding_error_kind_classify_oauth_message() {
1845        assert_eq!(
1846            EmbeddingErrorKind::classify("OAuth token expired for claude"),
1847            EmbeddingErrorKind::OAuth,
1848        );
1849        assert_eq!(
1850            EmbeddingErrorKind::classify("oauth authentication failed"),
1851            EmbeddingErrorKind::OAuth,
1852        );
1853    }
1854
1855    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a quota
1856    /// message to the Quota variant (without "OAuth" substring).
1857    #[test]
1858    fn embedding_error_kind_classify_quota_message() {
1859        assert_eq!(
1860            EmbeddingErrorKind::classify("quota exhausted on backend"),
1861            EmbeddingErrorKind::Quota,
1862        );
1863        assert_eq!(
1864            EmbeddingErrorKind::classify("Usage quota limit reached"),
1865            EmbeddingErrorKind::Quota,
1866        );
1867    }
1868
1869    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a slot-sema
1870    /// message to the SlotExhausted variant (matched BEFORE Quota so
1871    /// the more specific LLM-never-tried path wins).
1872    #[test]
1873    fn embedding_error_kind_classify_slot_exhausted_message() {
1874        assert_eq!(
1875            EmbeddingErrorKind::classify(
1876                "slot exhausted: failed to acquire LLM slot after backoff"
1877            ),
1878            EmbeddingErrorKind::SlotExhausted,
1879        );
1880    }
1881
1882    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a
1883    /// zero-dimensional vector error to the ZeroDimension variant.
1884    #[test]
1885    fn embedding_error_kind_classify_zero_dimension_message() {
1886        assert_eq!(
1887            EmbeddingErrorKind::classify("embedding returned dim=zero"),
1888            EmbeddingErrorKind::ZeroDimension,
1889        );
1890        assert_eq!(
1891            EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1892            EmbeddingErrorKind::ZeroDimension,
1893        );
1894    }
1895
1896    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify falls back to
1897    /// the Unknown variant when no marker matches, and the code()
1898    /// accessor returns the kebab-safe discriminator string.
1899    #[test]
1900    fn embedding_error_kind_classify_unknown_fallback() {
1901        assert_eq!(
1902            EmbeddingErrorKind::classify("unrelated subprocess error"),
1903            EmbeddingErrorKind::Unknown,
1904        );
1905        assert_eq!(
1906            EmbeddingErrorKind::classify("rate limit hit"),
1907            EmbeddingErrorKind::Unknown,
1908        );
1909        // code() returns the stable discriminator string.
1910        assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1911        assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1912        assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1913        assert_eq!(
1914            EmbeddingErrorKind::BackendMismatch.code(),
1915            "backend-mismatch"
1916        );
1917        assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1918        assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1919    }
1920
1921    /// Display impl covers all three variants without panicking.
1922    #[test]
1923    fn fallback_reason_display_does_not_panic() {
1924        let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1925        let _ = FallbackReason::Cancelled.to_string();
1926        let _ = FallbackReason::Timeout {
1927            operation: "embed_query".into(),
1928            duration_secs: 30,
1929        }
1930        .to_string();
1931    }
1932
1933    /// FallbackReason is PartialEq — used in test assertions to verify
1934    /// the mapping rules.
1935    #[test]
1936    fn fallback_reason_is_partial_eq() {
1937        assert_eq!(
1938            FallbackReason::EmbeddingFailed("a".into()),
1939            FallbackReason::EmbeddingFailed("a".into())
1940        );
1941        assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1942        assert_ne!(
1943            FallbackReason::EmbeddingFailed("a".into()),
1944            FallbackReason::EmbeddingFailed("b".into())
1945        );
1946        assert_ne!(
1947            FallbackReason::Cancelled,
1948            FallbackReason::Timeout {
1949                operation: "x".into(),
1950                duration_secs: 1
1951            }
1952        );
1953    }
1954
1955    /// Timeout variant preserves the operation name and duration from the
1956    /// original AppError::Timeout for observability.
1957    #[test]
1958    fn fallback_reason_timeout_preserves_fields() {
1959        let r = FallbackReason::Timeout {
1960            operation: "embed_query_local".into(),
1961            duration_secs: 300,
1962        };
1963        match r {
1964            FallbackReason::Timeout {
1965                operation,
1966                duration_secs,
1967            } => {
1968                assert_eq!(operation, "embed_query_local");
1969                assert_eq!(duration_secs, 300);
1970            }
1971            other => panic!("expected Timeout, got {other:?}"),
1972        }
1973    }
1974
1975    /// try_embed_query_with_fallback surfaces an EmbeddingFailed variant
1976    /// when the LLM subprocess errors. Uses a path that surely does not
1977    /// contain any embedder configuration (the binary is invoked as
1978    /// `codex` / `claude` via PATH which, in tests, defaults to nothing
1979    /// in scope, so `LlmEmbedding::detect_available()` returns Err).
1980    #[test]
1981    #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1982    fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1983        // Pointing at a models dir that does not exist forces the embedder
1984        // init to fail; the error is mapped to EmbeddingFailed.
1985        let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1986        let result = try_embed_query_with_fallback(bogus, "hello world");
1987        match result {
1988            Err(FallbackReason::EmbeddingFailed(msg)) => {
1989                // The original error must survive in the message for ops triage.
1990                assert!(!msg.is_empty(), "fallback message must not be empty");
1991            }
1992            Err(FallbackReason::Cancelled) => {
1993                panic!("expected EmbeddingFailed, got Cancelled");
1994            }
1995            Err(FallbackReason::Timeout { .. }) => {
1996                panic!("expected EmbeddingFailed, got Timeout");
1997            }
1998            Err(FallbackReason::SlotExhausted) => {
1999                panic!("expected EmbeddingFailed, got SlotExhausted");
2000            }
2001            Err(FallbackReason::OAuthQuota { .. }) => {
2002                panic!("expected EmbeddingFailed, got OAuthQuota");
2003            }
2004            Err(FallbackReason::BackendMismatch { .. }) => {
2005                panic!("expected EmbeddingFailed, got BackendMismatch");
2006            }
2007            Err(FallbackReason::DimZero) => {
2008                panic!("expected EmbeddingFailed, got DimZero");
2009            }
2010            Ok(_) => {
2011                panic!("expected an error, got Ok — embedder must fail for bogus path");
2012            }
2013        }
2014    }
2015
2016    // G56: entity embed cache — unit tests
2017    #[test]
2018    fn g56_entity_cache_key_is_stable_and_distinct() {
2019        let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2020        let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2021        let k3 = entity_cache_key("codex:default", "claude-code");
2022        let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2023        assert_eq!(k1, k2, "same model+text must hash identically");
2024        assert_ne!(k1, k3, "different text must hash differently");
2025        assert_ne!(k1, k4, "different model must hash differently");
2026    }
2027
2028    #[test]
2029    fn g56_entity_embed_cache_stats_hit_rate() {
2030        let zero = EmbedCacheStats::default();
2031        assert_eq!(zero.hit_rate(), 0.0);
2032        let half = EmbedCacheStats {
2033            requested: 4,
2034            hits: 2,
2035            misses: 2,
2036        };
2037        assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2038        let all = EmbedCacheStats {
2039            requested: 7,
2040            hits: 7,
2041            misses: 0,
2042        };
2043        assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2044    }
2045
2046    #[test]
2047    fn g56_entity_embed_cache_populates_and_hits() {
2048        // Manually populate the cache: bypasses the LLM by writing a
2049        // known vector under a chosen (model, text) key, then verifies
2050        // the cache is consulted before any LLM call would happen.
2051        let cache = entity_embed_cache();
2052        let model = "test-model";
2053        let text = "sqlite-graphrag";
2054        let key = entity_cache_key(model, text);
2055        let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2056        cache.lock().insert(key, Arc::clone(&stored));
2057        let guard = cache.lock();
2058        let hit = guard.get(&key).expect("cache must return stored value");
2059        assert_eq!(hit.len(), crate::constants::embedding_dim());
2060        assert!((hit[0] - 0.42).abs() < 1e-6);
2061    }
2062
2063    #[test]
2064    fn g56_empty_texts_short_circuits_with_zero_stats() {
2065        // Cannot call embed_entity_texts_cached without an LLM on PATH,
2066        // so we only verify the empty-input contract via the stats struct.
2067        let stats = EmbedCacheStats::default();
2068        assert_eq!(stats.requested, 0);
2069        assert_eq!(stats.hits, 0);
2070        assert_eq!(stats.misses, 0);
2071        assert_eq!(stats.hit_rate(), 0.0);
2072    }
2073}
2074
2075// =============================================================================
2076// v1.0.82 (GAP-005) — embed_with_fallback tests
2077// =============================================================================
2078#[cfg(test)]
2079mod embed_with_fallback_tests {
2080    use super::*;
2081    use crate::llm::exit_code_hints::LlmBackendError;
2082
2083    #[test]
2084    fn none_backend_returns_empty_vector_without_calling_llm() {
2085        // The `None` backend short-circuits to `Ok(vec![])` without
2086        // touching the LLM at all. This is the signal the caller uses
2087        // to insert a `pending_embeddings` row.
2088        let (v, kind) = embed_via_backend(
2089            std::path::Path::new("/nonexistent"),
2090            "any text",
2091            &LlmBackendKind::None,
2092        )
2093        .expect("None backend never fails");
2094        assert!(v.is_empty());
2095        assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2096    }
2097
2098    #[test]
2099    fn empty_chain_defaults_to_codex_claude_none() {
2100        // Internal invariant: the default chain order is the v1.0.81
2101        // implicit order (codex first, then claude, then None as
2102        // graceful-degradation fallback).
2103        let defaults = [
2104            LlmBackendKind::Codex,
2105            LlmBackendKind::Claude,
2106            LlmBackendKind::None,
2107        ];
2108
2109        // ---------------------------------------------------------------
2110        // ADR-0042: as_str + reason_code unit tests
2111        // ---------------------------------------------------------------
2112
2113        #[allow(dead_code)]
2114        fn llm_backend_kind_as_str_is_stable() {
2115            assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2116            assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2117            assert_eq!(LlmBackendKind::None.as_str(), "none");
2118        }
2119
2120        #[allow(dead_code)]
2121        fn fallback_reason_reason_code_is_stable() {
2122            assert_eq!(
2123                FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2124                "embedding_failed"
2125            );
2126            assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2127            assert_eq!(
2128                FallbackReason::Timeout {
2129                    operation: "embed_query".into(),
2130                    duration_secs: 30
2131                }
2132                .reason_code(),
2133                "timeout"
2134            );
2135        }
2136        assert_eq!(defaults.len(), 3);
2137    }
2138
2139    #[test]
2140    fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2141        // ADR-0046 / BUG-11 v1.0.88: a fallback chain of only `[None]`
2142        // without `skip_on_failure=true` MUST abort with
2143        // `AppError::Embedding("no LLM backends available; fallback chain exhausted")`.
2144        //
2145        // Before BUG-11, the `None` tail returned `Ok((vec![], None))`
2146        // silently, which let `remember` persist a memory with a
2147        // zero-dimensional embedding (invisible to recall). The fix
2148        // routes the chain exhaustion through `embed_via_backend_strict`
2149        // so the caller can distinguish between "chain intentionally
2150        // degrades to skip" (skip_on_failure=true) and "chain has no
2151        // viable backend at all" (this test).
2152        let chain = vec![LlmBackendKind::None];
2153        let err = embed_with_fallback(
2154            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2155            "hello",
2156            &chain,
2157            false,
2158        )
2159        .expect_err("chain of only [None] without skip_on_failure MUST abort");
2160        let msg = format!("{err}");
2161        assert!(
2162            msg.contains("no LLM backends available"),
2163            "error must mention exhausted chain, got: {msg}"
2164        );
2165    }
2166    #[test]
2167    fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2168        // skip_on_failure=true + a chain of only `None` returns Ok(vec![])
2169        // because the None short-circuit always succeeds. This is the
2170        // canonical contract: skip_on_failure is a no-op when None is
2171        // the tail because None already provides graceful degradation.
2172        let chain = vec![LlmBackendKind::None];
2173        let v = embed_with_fallback(
2174            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2175            "hello",
2176            &chain,
2177            true,
2178        )
2179        .expect("None chain is always Ok");
2180        assert!(v.0.is_empty(), "vector must be empty");
2181        assert_eq!(v.1, LlmBackendKind::None);
2182    }
2183    #[allow(dead_code)]
2184    fn llm_backend_error_no_backends_default_message() {
2185        // The fallback chain exhaustion error must mention
2186        // in its hint so the operator knows the remediation.
2187        let e = LlmBackendError::NoBackendsAvailable;
2188        let h = e.hint();
2189        assert!(h.contains("--llm-fallback"));
2190    }
2191
2192    #[test]
2193    fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2194        let e = LlmBackendError::NonZeroExit {
2195            exit_code: Some(137),
2196            signal: Some(9),
2197            stdout_tail: "out".into(),
2198            stderr_tail: "OOM killed".into(),
2199            binary: "codex".into(),
2200            hint: "OOM".into(),
2201        };
2202        let s = e.to_string();
2203        assert!(s.contains("codex"));
2204        assert!(s.contains("OOM killed"));
2205        assert!(s.contains("signal 9") || s.contains("exit 137"));
2206    }
2207}