Skip to main content

sqlite_graphrag/
embedder.rs

1//! Embedding generation for the GraphRAG memory.
2//!
3//! v1.0.76: the default build is **LLM-only** — the binary does NOT bundle
4//! fastembed / ort / ndarray / tokenizers. All embeddings are produced
5//! by a headless invocation of `claude code` or `codex` (OAuth, no MCP,
6//! no hooks) and stored as a BLOB in `memory_embeddings(memory_id, embedding,
7//! source)`. Vector similarity is computed in pure Rust at query time.
8//!
9//! # Workload classification (G42/S3, BLOCK 1 — MANDATORY)
10//!
11//! LLM embedding is **I/O-bound + subprocess-bound**: each call waits
12//! 5-60s on a network round-trip through a headless `claude -p` /
13//! `codex exec` subprocess while the local CPU stays idle. Concurrency
14//! therefore uses **tokio** (async I/O concurrency) and NEVER rayon
15//! (reserved for CPU-bound work).
16//!
17//! # Permit formula (G42/S3, BLOCO 2)
18//!
19//! ```text
20//! permits = clamp(--llm-parallelism, 1, 32)
21//!           .min(available_parallelism())
22//!           .min(available_ram_mb * 0.5 / LLM_WORKER_RSS_MB)
23//! ```
24//!
25//! `LLM_WORKER_RSS_MB = 350` (`crate::constants`): `claude -p` and
26//! `codex exec` are node processes with a typical Maximum RSS of
27//! 200-400 MB (measured via `/usr/bin/time -l` on macOS /
28//! `/usr/bin/time -v` on Linux), so the RAM bound is pertinent.
29//!
30//! # Locking contract (G42/A3 fix)
31//!
32//! The process-wide `Mutex<LlmEmbedding>` protects ONLY the cheap clone
33//! of the client configuration (flavour + binary path + model + shared
34//! schema tempfiles). It is NEVER held across network I/O — the
35//! v1.0.76-v1.0.78 `flush_group` held it for the whole sequential
36//! embedding loop, which is why `--llm-parallelism 8` measured an
37//! effective parallelism of 1.
38
39use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49/// Process-wide LLM-embedding client behind a .
50///
51/// The lock guards configuration cloning only (see module docs); the
52/// actual LLM I/O happens on clones, outside the lock.
53///
54/// ADR-0042 / GAP-002: process-wide Claude-backed LLM-embedding client
55/// behind a `Mutex`. Distinct from `EMBEDDER` so the Claude path of
56/// `embed_via_backend` no longer re-probes PATH via `detect_available`
57/// (the v1.0.82 bug where requesting Claude could resolve to Codex).
58static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
61
62/// Process-wide multi-thread tokio runtime for embedding I/O.
63///
64/// G42/A2 fix: v1.0.76-v1.0.78 built a current-thread runtime PER CALL.
65/// One runtime per process amortises the setup and hosts the bounded
66/// fan-out of `embed_texts_parallel`.
67static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
68
69/// Calibration base: chunk (long-text) batch size per LLM call at the
70/// calibration dimensionality (G42/S2). Use [`chunk_embed_batch_size`]
71/// for the dim-adaptive value (G44).
72pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
73
74/// Calibration base: entity-name (short-text) batch size per LLM call at
75/// the calibration dimensionality (G42/S2). Use [`entity_embed_batch_size`]
76/// for the dim-adaptive value (G44).
77pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
78
79/// Dimensionality the batch bases above were calibrated against (G44).
80pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
81
82/// G44: scales a calibration-base batch size to the active dimensionality,
83/// keeping the float budget per LLM call constant (~512 floats for chunks,
84/// ~1600 for entity names — the budgets empirically validated at dim 64).
85/// Fixed batches of 8 at 384 dims asked for ~3072 floats per response:
86/// claude returned partial coverage (3 of 8 items, caught by the G42/C5
87/// check) and codex timed out at 300s. `base.max(1)` keeps the function
88/// total — `clamp` panics when the upper bound is below the lower one.
89fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
90    let base = base.max(1);
91    (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
92}
93
94/// Dim-adaptive batch size for chunk (long-text) embedding calls (G44).
95pub fn chunk_embed_batch_size() -> usize {
96    let dim = crate::constants::embedding_dim();
97    let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
98    tracing::debug!(
99        dim,
100        base = CHUNK_EMBED_BATCH_SIZE,
101        batch,
102        "adaptive chunk batch size (G44)"
103    );
104    batch
105}
106
107/// Dim-adaptive batch size for entity-name (short-text) embedding calls (G44).
108pub fn entity_embed_batch_size() -> usize {
109    let dim = crate::constants::embedding_dim();
110    let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
111    tracing::debug!(
112        dim,
113        base = ENTITY_EMBED_BATCH_SIZE,
114        batch,
115        "adaptive entity batch size (G44)"
116    );
117    batch
118}
119
120/// Returns the process-wide multi-thread runtime, building it on first use.
121pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
122    if let Some(rt) = RUNTIME.get() {
123        return Ok(rt);
124    }
125    let rt = tokio::runtime::Builder::new_multi_thread()
126        .worker_threads(2)
127        .enable_all()
128        .build()
129        .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
130    let _ = RUNTIME.set(rt);
131    Ok(RUNTIME.get().expect("RUNTIME initialised above"))
132}
133
134/// Initialises the LLM-embedding client on first use and returns it.
135pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
136    if let Some(e) = EMBEDDER.get() {
137        return Ok(e);
138    }
139    let backend = LlmEmbedding::detect_available()?;
140    let _ = EMBEDDER.set(Mutex::new(backend));
141    Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
142}
143
144/// ADR-0042 / GAP-002: returns the process-wide Claude embedder, lazily
145/// initialising it on first use. Binary and model overrides come from
146/// the explicit arguments; `None` falls back to PATH/env defaults via
147/// the builder.
148pub fn get_claude_embedder(
149    claude_binary: Option<&Path>,
150    claude_model: Option<&str>,
151) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
152    if let Some(e) = CLAUDE_EMBEDDER.get() {
153        return Ok(e);
154    }
155    let mut builder = LlmEmbedding::with_claude_builder();
156    if let Some(b) = claude_binary {
157        builder = builder.override_binary(b.to_path_buf());
158    }
159    if let Some(m) = claude_model {
160        builder = builder.override_model(m.to_string());
161    }
162    let backend = builder.build()?;
163    let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
164    Ok(CLAUDE_EMBEDDER
165        .get()
166        .expect("CLAUDE_EMBEDDER initialised above"))
167}
168
169/// GAP-OPENCODE-001 / v1.0.90: returns the process-wide OpenCode embedder,
170/// lazily initialising it on first use. Binary and model overrides come
171/// from the explicit arguments; `None` falls back to PATH/env defaults via
172/// the builder.
173pub fn get_opencode_embedder(
174    opencode_binary: Option<&Path>,
175    opencode_model: Option<&str>,
176) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
177    if let Some(e) = OPENCODE_EMBEDDER.get() {
178        return Ok(e);
179    }
180    let mut builder = LlmEmbedding::with_opencode_builder();
181    if let Some(b) = opencode_binary {
182        builder = builder.override_binary(b.to_path_buf());
183    }
184    if let Some(m) = opencode_model {
185        builder = builder.override_model(m.to_string());
186    }
187    let backend = builder.build()?;
188    let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
189    Ok(OPENCODE_EMBEDDER
190        .get()
191        .expect("OPENCODE_EMBEDDER initialised above"))
192}
193
194/// ADR-0042 / GAP-002: route a single passage through the Claude
195/// embedder. Used by the Claude arm of `embed_via_backend` so the
196/// fallback chain stops treating Claude as a synonym for codex.
197pub fn embed_via_claude_local(
198    _models_dir: &Path,
199    text: &str,
200    claude_binary: Option<&Path>,
201    claude_model: Option<&str>,
202) -> Result<Vec<f32>, AppError> {
203    let _slot_guard = acquire_llm_slot_for_embedding()?;
204    let embedder = get_claude_embedder(claude_binary, claude_model)?;
205    embed_passage(embedder, text)
206}
207
208/// BUG-003 / v1.0.85: split of  that also
209/// reports the resolved []. Always  because
210/// this path constructs a Claude-flavoured embedder via
211///  (no PATH probe, no silent substitution).
212pub fn embed_via_claude_local_resolved(
213    _models_dir: &Path,
214    text: &str,
215    claude_binary: Option<&Path>,
216    claude_model: Option<&str>,
217) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
218    let _slot_guard = acquire_llm_slot_for_embedding()?;
219    let embedder = get_claude_embedder(claude_binary, claude_model)?;
220    let v = embed_passage(embedder, text)?;
221    Ok((v, LlmBackendKind::Claude))
222}
223
224/// GAP-OPENCODE-001 / v1.0.90: route a single passage through the OpenCode
225/// embedder, reporting the resolved [`LlmBackendKind::Opencode`]. Constructs
226/// an OpenCode-flavoured embedder via `with_opencode_builder` (no PATH probe,
227/// no silent substitution).
228pub fn embed_via_opencode_local_resolved(
229    _models_dir: &Path,
230    text: &str,
231    opencode_binary: Option<&Path>,
232    opencode_model: Option<&str>,
233) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
234    let _slot_guard = acquire_llm_slot_for_embedding()?;
235    let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
236    let v = embed_passage(embedder, text)?;
237    Ok((v, LlmBackendKind::Opencode))
238}
239/// Clones the embedding-client configuration. The lock is held only for
240/// the duration of the clone — NEVER across I/O (G42/A3).
241fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
242    embedder.lock().clone()
243}
244
245/// Embeds a single passage for storage. Delegates to the configured LLM
246/// headless (claude code / codex). Returns a vector of the active
247/// dimensionality.
248pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
249    let client = clone_client(embedder);
250    let result = client.embed_passage(text)?;
251    validate_dim(result)
252}
253
254/// Embeds a single query for similarity search. Same model and dim as
255/// `embed_passage`; the only difference is the LLM-side prompt prefix
256/// that the headless invocation uses to disambiguate.
257pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
258    let client = clone_client(embedder);
259    let result = client.embed_query(text)?;
260    validate_dim(result)
261}
262
263/// Embeds a batch of passages with token-count-aware batching.
264///
265/// Kept for API compatibility; since v1.0.79 it routes through the
266/// bounded parallel fan-out with conservative defaults.
267pub fn embed_passages_controlled(
268    embedder: &Mutex<LlmEmbedding>,
269    texts: &[&str],
270    _token_counts: &[usize],
271) -> Result<Vec<Vec<f32>>, AppError> {
272    if texts.is_empty() {
273        return Ok(Vec::new());
274    }
275    let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
276    embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
277}
278
279pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
280    let _slot_guard = acquire_llm_slot_for_embedding()?;
281    let embedder = get_embedder(models_dir)?;
282    embed_passage(embedder, text)
283}
284
285/// v1.0.89 (BUG-SKIP-EMBED): reads `SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE`
286/// env var (set by `--skip-embedding-on-failure` via main.rs propagation).
287/// Returns `true` when the user opted to persist with NULL embedding on failure.
288pub fn should_skip_embedding_on_failure() -> bool {
289    matches!(
290        std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
291        Ok("1") | Ok("true")
292    )
293}
294
295/// v1.0.89 (BUG-SKIP-EMBED + GAP-EMBED-PROPAGATION): embed a passage
296/// honouring both `--llm-backend` and `--skip-embedding-on-failure`.
297///
298/// On success returns `Ok(Some(vec))`. On failure:
299/// - if `--skip-embedding-on-failure` is active, logs a warning and returns `Ok(None)`
300/// - otherwise propagates the error (exit 11)
301pub fn embed_passage_or_skip(
302    models_dir: &Path,
303    text: &str,
304    choice: Option<crate::cli::LlmBackendChoice>,
305) -> Result<Option<Vec<f32>>, AppError> {
306    match embed_passage_with_choice(models_dir, text, choice) {
307        Ok((v, _backend)) => Ok(Some(v)),
308        Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
309        Err(e) => {
310            if should_skip_embedding_on_failure() {
311                tracing::warn!(
312                    error = %e,
313                    "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
314                );
315                Ok(None)
316            } else {
317                Err(e)
318            }
319        }
320    }
321}
322
323/// BUG-003 / v1.0.85: split of `embed_passage_local` that reports the
324/// resolved [`LlmBackendKind`] based on the ACTUAL
325/// [`LlmEmbedding::flavour`] of the embedder constructed. When
326/// `LlmEmbedding::detect_available` substitutes claude for a missing
327/// codex, the operator sees the truth in `envelope.backend_invoked`.
328pub fn embed_passage_local_resolved(
329    models_dir: &Path,
330    text: &str,
331) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
332    let _slot_guard = acquire_llm_slot_for_embedding()?;
333    let embedder = get_embedder(models_dir)?;
334    let v = embed_passage(embedder, text)?;
335    let kind = match embedder.lock().flavour() {
336        crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
337        crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
338        crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
339    };
340    Ok((v, kind))
341}
342
343pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
344    let _slot_guard = acquire_llm_slot_for_embedding()?;
345    let embedder = get_embedder(models_dir)?;
346    embed_query(embedder, text)
347}
348
349// =============================================================================
350// v1.0.82 (GAP-003): wrappers que aceitam a escolha do CLI
351// (`crate::cli::LlmBackendChoice`) e a traduzem em uma chain para
352// `embed_with_fallback`. Centralizam a propagação do flag `--llm-backend`
353// nos 6 comandos que produzem embedding (`remember`, `edit`, `ingest`,
354// `enrich`, `recall`, `hybrid-search`).
355// =============================================================================
356
357/// Embed a single passage using the LLM backend selected by the user via
358/// `--llm-backend`. Routes to `embed_with_fallback` so failures fall
359/// through to the next backend in the chain before giving up.
360///
361/// When `choice` is `None` (e.g. a sub-command that does not yet
362/// expose the flag), behaviour matches `embed_passage_local` — the
363/// active embedder from `LlmEmbedding::detect_available` decides the
364/// backend.
365pub fn embed_passage_with_choice(
366    models_dir: &Path,
367    text: &str,
368    choice: Option<crate::cli::LlmBackendChoice>,
369) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
370    let _slot_guard = acquire_llm_slot_for_embedding()?;
371    match choice {
372        None => {
373            let embedder = get_embedder(models_dir)?;
374            embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
375        }
376        Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
377    }
378}
379/// failure, returns a structured `FallbackReason` so the caller can
380/// surface `vec_degraded` instead of a hard exit 11.
381///
382/// `None` matches the legacy `try_embed_query_with_fallback` path
383/// (uses the active embedder without an explicit chain).
384pub fn try_embed_query_with_choice(
385    models_dir: &Path,
386    text: &str,
387    choice: Option<crate::cli::LlmBackendChoice>,
388) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
389    match embed_passage_with_choice(models_dir, text, choice) {
390        // GAP-004 / v1.0.85.1: when the chain terminates on
391        //  (i.e. user passed
392        // or every preceding backend failed),  returns
393        //  instead of an error. Without this guard the
394        // empty vector would propagate to  which
395        // aborts with exit 11 ("embedding has 0 dims, expected 64").
396        // The caller's contract is to surface a typed
397        // so  and  can route to FTS5-puro via
398        // the existing  /  envelope.
399        // Intercept the empty-vector success path and surface it as
400        //  (introduced at v1.0.85 / ADR-0043
401        // for the symmetric LLM-returned-zero-dim case).
402        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
403        Ok((v, backend)) => Ok((v, backend)),
404        Err(e) => Err(classify_embedding_error(e)),
405    }
406}
407/// call. Reads the max-concurrency from
408/// `SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY` (default derived from
409/// `LLM_WORKER_RSS_MB` and available memory), and the wait timeout
410/// from `SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS` (default 30s).
411///
412/// Returns `Ok(guard)` for happy path, `AppError::LockBusy` (exit 75)
413/// when no slot is available within the wait window, and
414/// `AppError::Validation` when the concurrency is 0.
415///
416/// The `LLM_SLOT_NO_WAIT` env var (or its CLI flag equivalent) sets
417/// `wait_secs = 0` to fail fast in tests.
418fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
419    use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
420    let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
421        .ok()
422        .and_then(|s| s.parse::<u32>().ok())
423        .filter(|n| *n >= 1)
424        .unwrap_or_else(crate::llm_slots::default_max_concurrency);
425    let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
426        0
427    } else {
428        std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
429            .ok()
430            .and_then(|s| s.parse::<u64>().ok())
431            .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
432    };
433    let _ = LLM_WORKER_RSS_MB; // silence the unused import (used in default_max_concurrency)
434                               // GAP-003 / ADR-0043: when the slot semaphore is contended beyond the
435                               // backoff window (50 + 100 + 200 + 400 = 750ms total), return a
436                               // marker message that `classify_embedding_error` maps to
437                               // `FallbackReason::SlotExhausted` (discriminator `slot_exhausted`).
438                               // The window is shorter than the legacy 30s timeout, so the operator
439                               // observes FTS5-puro fallback quickly instead of after 30s of silence.
440    match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
441        Ok(guard) => Ok(guard),
442        Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
443            "slot exhausted: {e} (fall back to FTS5)"
444        ))),
445        Err(e) => Err(e),
446    }
447}
448/// GAP-004 (v1.0.88): typed classifier for embedding error messages.
449///
450/// Decomposes the legacy `AppError::Embedding(String)` payload into a
451/// small enum so the call sites can branch on the cause instead of
452/// repeating `msg.contains(...)` literals. The classification is purely
453/// lexical (case-insensitive substring match on the error message) — no
454/// I/O, no retries, no telemetry, deterministic and safe under
455/// `#[serial_test::serial(env)]`.
456///
457/// 6 variants cover the 5 known discriminators from v1.0.85 (ADR-0043)
458/// plus an `Unknown` fallback for messages that do not match any marker.
459#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum EmbeddingErrorKind {
461    /// OAuth token expired or absent; no backend can authenticate.
462    OAuth,
463    /// OAuth usage quota exhausted on the named backend.
464    Quota,
465    /// LLM slot semaphore exhausted after the backoff window.
466    SlotExhausted,
467    /// User-requested backend differs from the one that actually executed.
468    BackendMismatch,
469    /// Embedding returned a zero-dimensional vector (structural bug).
470    ZeroDimension,
471    /// Message did not match any of the 5 markers above.
472    Unknown,
473}
474
475impl EmbeddingErrorKind {
476    /// Classify an embedding error message into a typed kind.
477    ///
478    /// Order of checks matters: `OAuth` is matched before `Quota` because
479    /// both substrings can co-occur in the same message. `SlotExhausted`
480    /// is checked before `Quota` because the slot-sema path is more
481    /// specific (the LLM never even tried to authenticate). The checks
482    /// are case-insensitive so `OAuth` and `oauth` both classify to
483    /// `EmbeddingErrorKind::OAuth`.
484    pub fn classify(msg: &str) -> Self {
485        let m = msg.to_lowercase();
486        if m.contains("oauth") {
487            Self::OAuth
488        } else if m.contains("quota") {
489            Self::Quota
490        } else if m.contains("slot exhausted") {
491            Self::SlotExhausted
492        } else if m.contains("backend mismatch") {
493            Self::BackendMismatch
494        } else if m.contains("dim") && m.contains("zero") {
495            Self::ZeroDimension
496        } else {
497            Self::Unknown
498        }
499    }
500
501    /// Stable, machine-friendly discriminator code (lowercase, kebab-safe).
502    pub fn code(&self) -> &'static str {
503        match self {
504            Self::OAuth => "oauth",
505            Self::Quota => "quota",
506            Self::SlotExhausted => "slot-exhausted",
507            Self::BackendMismatch => "backend-mismatch",
508            Self::ZeroDimension => "zero-dimension",
509            Self::Unknown => "unknown",
510        }
511    }
512}
513
514impl std::fmt::Display for EmbeddingErrorKind {
515    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516        f.write_str(self.code())
517    }
518}
519
520/// G58/S1: reason an embedding call could not be completed and the caller
521/// must fall back to a non-vector retrieval path (FTS5 prefix + LIKE).
522///
523/// Returned by [`try_embed_query_with_fallback`] so the `recall` and
524/// `hybrid-search` handlers can surface a structured `vec_degraded` /
525/// `warning` envelope instead of a hard `AppError::Embedding` exit 11.
526#[derive(Debug, Clone, PartialEq)]
527pub enum FallbackReason {
528    /// The LLM subprocess failed (rate limit, OAuth contention, quota
529    /// exhausted, model unparsable response, divergent dim, etc.).
530    /// Carries the original error message for observability.
531    EmbeddingFailed(String),
532    /// The LLM slot semaphore was exhausted: 8+ concurrent LLM
533    /// subprocesses blocked the acquire beyond the backoff window
534    /// (50ms + 100ms + 200ms + 400ms = 750ms total). Resolved at v1.0.85
535    /// (GAP-003 / ADR-0043).
536    SlotExhausted,
537    /// OAuth usage quota exhausted on the named backend. The caller
538    /// should retry with an alternative backend (codex ↔ claude)
539    /// before falling back to FTS5-puro.
540    OAuthQuota { backend: &'static str },
541    /// The user requested a backend that differs from the one that
542    /// actually executed the embedding (legacy "synonym for codex"
543    /// bug from v1.0.83). Resolved at v1.0.84 (GAP-002).
544    BackendMismatch {
545        requested: &'static str,
546        resolved: &'static str,
547    },
548    /// The embedding returned a zero-dimensional vector, signalling a
549    /// structural bug (the LLM did not produce any floats). Distinct
550    /// from OAuthQuota (quota exhausted) and EmbeddingFailed
551    /// (subprocess error).
552    DimZero,
553    /// The embedding was cancelled by an external signal (SIGTERM, etc.).
554    Cancelled,
555    /// The embedding exceeded its time budget. Carries the operation name
556    /// and the elapsed seconds for diagnostic logging.
557    Timeout {
558        operation: String,
559        duration_secs: u64,
560    },
561}
562
563impl FallbackReason {
564    /// Stable, machine-friendly reason code used by JSON envelopes
565    /// (`vec_degraded_reason`). Mirrors the v1.0.84 contract extended
566    /// at v1.0.85 with 4 new variants (GAP-003 / ADR-0043).
567    pub fn reason_code(&self) -> &'static str {
568        match self {
569            Self::EmbeddingFailed(_) => "embedding_failed",
570            Self::SlotExhausted => "slot_exhausted",
571            Self::OAuthQuota { .. } => "oauth_quota",
572            Self::BackendMismatch { .. } => "backend_mismatch",
573            Self::DimZero => "dim_zero",
574            Self::Cancelled => "cancelled",
575            Self::Timeout { .. } => "timeout",
576        }
577    }
578}
579
580impl std::fmt::Display for FallbackReason {
581    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582        match self {
583            Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
584            Self::SlotExhausted => write!(
585                f,
586                "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
587            ),
588            Self::OAuthQuota { backend } => {
589                write!(f, "OAuth usage quota exhausted on backend '{backend}'")
590            }
591            Self::BackendMismatch {
592                requested,
593                resolved,
594            } => {
595                write!(
596                    f,
597                    "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
598                )
599            }
600            Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
601            Self::Cancelled => write!(f, "embedding cancelled by external signal"),
602            Self::Timeout {
603                operation,
604                duration_secs,
605            } => {
606                write!(
607                    f,
608                    "embedding timed out after {duration_secs}s during {operation}"
609                )
610            }
611        }
612    }
613}
614
615impl std::error::Error for FallbackReason {}
616
617/// G58/S1: try to embed a query, mapping any failure to a structured
618/// [`FallbackReason`] so callers can route to FTS5 + LIKE fallback instead
619/// of returning exit 11 to the user.
620///
621/// This is the bridge between the hard-fail `embed_query_local` (used by
622/// write paths where embedding failure aborts the operation) and the
623/// graceful-degradation contract of `recall` / `hybrid-search` in v1.0.80.
624pub fn try_embed_query_with_fallback(
625    models_dir: &Path,
626    query: &str,
627) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
628    match embed_query_local(models_dir, query) {
629        Ok(v) => Ok((v, LlmBackendKind::None)),
630        Err(e) => Err(classify_embedding_error(e)),
631    }
632}
633
634/// G58 / ADR-0043 (v1.0.85): deterministic fallback for `recall` and
635/// `hybrid-search`.
636///
637/// - On `OAuthQuota { backend }`, retry once with the alternative backend
638///   (codex ↔ claude) before giving up.
639/// - On `SlotExhausted`, sleep 750ms and retry once (gives the slot
640///   semaphore time to release a permit from a sibling subprocess).
641/// - On any other `FallbackReason`, return immediately (deterministic).
642pub fn try_embed_query_with_deterministic_fallback(
643    models_dir: &Path,
644    query: &str,
645    choice: Option<crate::cli::LlmBackendChoice>,
646) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
647    match try_embed_query_with_choice(models_dir, query, choice) {
648        Ok(t) => Ok(t),
649        Err(reason @ FallbackReason::OAuthQuota { backend }) => {
650            let alt = match backend {
651                "codex" => Some(crate::cli::LlmBackendChoice::Claude),
652                "claude" => Some(crate::cli::LlmBackendChoice::Codex),
653                "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
654                _ => None,
655            };
656            if let Some(alt_choice) = alt {
657                try_embed_query_with_choice(models_dir, query, Some(alt_choice))
658            } else {
659                Err(reason)
660            }
661        }
662        Err(reason @ FallbackReason::SlotExhausted) => {
663            std::thread::sleep(std::time::Duration::from_millis(750));
664            try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
665        }
666        Err(other) => Err(other),
667    }
668}
669
670/// Classify an embedding [`AppError`] into a typed [`FallbackReason`].
671///
672/// v1.0.85 (ADR-0043): discriminates the 4 new causes (SlotExhausted,
673/// OAuthQuota, BackendMismatch, DimZero) from the legacy generic
674/// EmbeddingFailed bucket. The classification is purely lexical
675/// (substring match on the message) — no I/O, no retries, no
676/// telemetry, deterministic and `#[serial_test::serial(env)]`-safe.
677pub fn classify_embedding_error(err: AppError) -> FallbackReason {
678    match err {
679        AppError::Timeout {
680            operation,
681            duration_secs,
682        } => FallbackReason::Timeout {
683            operation,
684            duration_secs,
685        },
686        AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
687            // GAP-004 (v1.0.88): typed-discriminator dispatch.
688            // The lexical classifier picks the discriminator; the arms below
689            // enrich the result with the backend name and the
690            // requested/resolved pair that the JSON envelope needs.
691            //
692            // Note: `Cancelled` and `EmbeddingFailed(msg)` are not in the
693            // 6-variant enum (they have no lexical marker) so we keep them
694            // as explicit guards at the head of the match.
695            EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
696            EmbeddingErrorKind::OAuth => {
697                let backend = if msg.contains("codex") {
698                    "codex"
699                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
700                    // G45-CR5: anthropic-ratelimit-* headers are emitted only by
701                    // the Claude CLI subprocess; treat them as claude quota
702                    // signals even when the message text omits the word
703                    // "claude" explicitly.
704                    "claude"
705                } else if msg.contains("opencode") {
706                    "opencode"
707                } else {
708                    "unknown"
709                };
710                FallbackReason::OAuthQuota { backend }
711            }
712            EmbeddingErrorKind::Quota => {
713                let backend = if msg.contains("codex") {
714                    "codex"
715                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
716                    "claude"
717                } else if msg.contains("opencode") {
718                    "opencode"
719                } else {
720                    "unknown"
721                };
722                FallbackReason::OAuthQuota { backend }
723            }
724            EmbeddingErrorKind::BackendMismatch => {
725                // The `msg.contains("claude")` arm is intentionally
726                // placed BEFORE the OAuth arm so that a backend-mismatch
727                // message that mentions both "claude" and "codex" maps to
728                // BackendMismatch (the more specific failure mode).
729                let (requested, resolved) =
730                    if msg.contains("requested claude") && msg.contains("but codex") {
731                        ("claude", "codex")
732                    } else if msg.contains("requested codex") && msg.contains("but claude") {
733                        ("codex", "claude")
734                    } else if msg.contains("requested claude") {
735                        ("claude", "unknown")
736                    } else if msg.contains("requested codex") {
737                        ("codex", "unknown")
738                    } else {
739                        ("unknown", "unknown")
740                    };
741                FallbackReason::BackendMismatch {
742                    requested,
743                    resolved,
744                }
745            }
746            EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
747            EmbeddingErrorKind::Unknown => {
748                if msg.contains("cancelled") {
749                    FallbackReason::Cancelled
750                } else {
751                    FallbackReason::EmbeddingFailed(msg)
752                }
753            }
754        },
755        e => FallbackReason::EmbeddingFailed(e.to_string()),
756    }
757}
758// backends before giving up. The chain order matches the user-supplied
759// `--llm-fallback` list (default: codex, claude, none).
760// =============================================================================
761
762/// Tries each LLM backend in `chain` in order, returning the first
763/// successful embedding. On failure, the diagnostic tail of the last
764/// error is preserved in the returned `AppError::Embedding` so the
765/// operator can see WHY every backend failed.
766///
767/// If `skip_on_failure` is `true` AND every backend fails, the function
768/// returns `Ok(Vec::new())` (an empty vector) to signal "persist
769/// without embedding" — the call site is then responsible for writing
770/// a `pending_embeddings` row that can be retried later by the
771/// `embedding retry` subcommand.
772///
773/// Defaults the chain to `[codex, claude, none]` when `chain` is
774/// empty, matching the v1.0.81 behaviour where codex was the
775/// implicit default and claude was the implicit fallback.
776pub fn embed_with_fallback(
777    models_dir: &Path,
778    text: &str,
779    chain: &[LlmBackendKind],
780    skip_on_failure: bool,
781) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
782    use crate::llm::exit_code_hints::LlmBackendError;
783    let effective: Vec<LlmBackendKind> = if chain.is_empty() {
784        vec![
785            LlmBackendKind::Codex,
786            LlmBackendKind::Claude,
787            LlmBackendKind::Opencode,
788            LlmBackendKind::None,
789        ]
790    } else {
791        chain.to_vec()
792    };
793
794    let mut last_err: Option<AppError> = None;
795    for backend in &effective {
796        // BUG-003 / v1.0.85: propagar o backend REAL retornado por
797        // embed_via_backend (que pode diferir do chain position quando
798        // LlmEmbedding::detect_available substitui codex por claude).
799        // O tuple `(_, requested_kind)` é descartado — só queremos o
800        // backend resolvido na primeira posição.
801        // ADR-0046 / BUG-11 v1.0.88: use `embed_via_backend_strict` so the
802        // sentinel `None` backend propagates the last real error instead
803        // of silently degrading to `Ok((Vec::new(), None))`. This is the
804        // path that caused preflight rejections to be swallowed by the
805        // chain's default trailing `None`.
806        match embed_via_backend_strict(
807            models_dir,
808            text,
809            backend,
810            last_err.as_ref(),
811            skip_on_failure,
812        ) {
813            Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
814            Err(e) => {
815                tracing::warn!(
816                    target: "embedding",
817                    backend = ?backend,
818                    error = %e,
819                    "embed_with_fallback: backend failed, trying next"
820                );
821                last_err = Some(e);
822            }
823        }
824    }
825    if skip_on_failure {
826        // Signal "persist with no embedding" via an empty vector paired
827        // with `None` so callers know the chain exhausted without a hit.
828        // Caller is responsible for writing a `pending_embeddings` row
829        // that can be retried later by the `embedding retry` subcommand.
830        return Ok((Vec::new(), LlmBackendKind::None));
831    }
832    Err(last_err
833        .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
834}
835
836/// LLM backend kind for the fallback chain. Mirrors the CLI
837/// `--llm-backend` enum so users can pass the same value to
838/// `--llm-fallback` without translation.
839#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
840pub enum LlmBackendKind {
841    /// `codex exec` (default for v1.0.76+).
842    Codex,
843    /// `claude -p` (fallback for ChatGPT Pro OAuth unavailability).
844    Claude,
845    /// `opencode run` (v1.0.90).
846    Opencode,
847    /// No embedding — empty vector returned.
848    None,
849}
850
851impl LlmBackendKind {
852    /// Stable string label used in tracing and JSON envelopes. The
853    /// string values are part of the public contract for `envelope.backend_invoked`.
854    pub fn as_str(self) -> &'static str {
855        match self {
856            Self::Codex => "codex",
857            Self::Claude => "claude",
858            Self::Opencode => "opencode",
859            Self::None => "none",
860        }
861    }
862}
863
864/// Embeds a single text via the given backend. Used by
865/// `embed_with_fallback` and exposed to allow direct one-shot
866/// selection without a chain.
867/// Embeds a single text via the given backend. Used by
868/// `embed_with_fallback` and exposed to allow direct one-shot
869/// selection without a chain.
870///
871/// BUG-003 / v1.0.85: returns `(Vec<f32>, LlmBackendKind)`. The
872/// second element reports the backend that ACTUALLY executed the
873/// embedding, not the chain position requested by the caller. When
874/// `LlmBackendKind::Codex` is requested but `codex` is absent from
875/// PATH, `LlmEmbedding::detect_available` substitutes claude and the
876/// tuple carries `LlmBackendKind::Claude` so the operator sees the
877/// truth in `envelope.backend_invoked`.
878pub fn embed_via_backend(
879    models_dir: &Path,
880    text: &str,
881    backend: &LlmBackendKind,
882) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
883    match backend {
884        LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
885        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
886        LlmBackendKind::Claude => {
887            // ADR-0042 / GAP-002: route Claude through its own static
888            // embedder instead of re-using the Codex path (which used
889            // to silently pick Codex if PATH ordered it first).
890            tracing::debug!(
891                target: "embedder",
892                backend = "claude",
893                "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
894            );
895            embed_via_claude_local_resolved(models_dir, text, None, None)
896        }
897        LlmBackendKind::Opencode => {
898            tracing::debug!(
899                target: "embedder",
900                backend = "opencode",
901                "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
902            );
903            embed_via_opencode_local_resolved(models_dir, text, None, None)
904        }
905    }
906}
907
908// ADR-0046 / BUG-11 v1.0.88: specialisation of `embed_via_backend` that
909// refuses to SILENTLY DEGRADE to `LlmBackendKind::None` after all real
910// backends (Codex, Claude) have failed. The previous behaviour
911// (`Ok((Vec::new(), None))`) caused the `remember` write path to persist
912// memories with zero-dimensional embeddings — breaking `recall` and
913// `hybrid-search` while returning exit 0 (BUG-11 CRITICAL).
914//
915// When `--llm-backend none` is explicitly requested (i.e. `last_err` is
916// None AND the chain was a single-element `[None]`), pass
917// `skip_on_failure = true` to `embed_with_fallback` to consume the empty
918// vector via the pending-embeddings retry queue instead of persisting
919// directly. This helper is the right hook for `remember`/`edit`/`ingest`.
920pub fn embed_via_backend_strict(
921    models_dir: &Path,
922    text: &str,
923    backend: &LlmBackendKind,
924    last_err: Option<&AppError>,
925    skip_on_failure: bool,
926) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
927    use crate::llm::exit_code_hints::LlmBackendError;
928    match backend {
929        LlmBackendKind::None => {
930            // If the caller opted into skip_on_failure AND no prior
931            // backend has recorded an error, the empty vector is
932            // intentional (chain of only [None]).
933            if skip_on_failure && last_err.is_none() {
934                Ok((Vec::new(), LlmBackendKind::None))
935            } else if last_err.is_some() {
936                // The chain reached `None` after Codex/Claude failed.
937                // Propagate the most recent error so `remember` aborts
938                // instead of persisting a memory without an embedding.
939                Err(match last_err {
940                    Some(e) => AppError::Embedding(format!("{e}")),
941                    None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
942                })
943            } else {
944                // Empty chain with no skip_on_failure — treat as a
945                // configuration error (no backends available).
946                Err(AppError::Embedding(
947                    LlmBackendError::NoBackendsAvailable.to_string(),
948                ))
949            }
950        }
951        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
952        LlmBackendKind::Claude => {
953            tracing::debug!(
954                target: "embedder",
955                backend = "claude",
956                "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
957            );
958            embed_via_claude_local_resolved(models_dir, text, None, None)
959        }
960        LlmBackendKind::Opencode => {
961            tracing::debug!(
962                target: "embedder",
963                backend = "opencode",
964                "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
965            );
966            embed_via_opencode_local_resolved(models_dir, text, None, None)
967        }
968    }
969}
970
971/// Legacy one-shot wrapper around `embed_via_backend` that discards
972/// the resolved backend. Kept for call sites that only care about
973/// the vector and ignore the executed-backend signal. New code
974/// should prefer `embed_via_backend` directly.
975pub fn embed_via_backend_legacy(
976    models_dir: &Path,
977    text: &str,
978    backend: &LlmBackendKind,
979) -> Result<Vec<f32>, AppError> {
980    embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
981}
982
983pub fn embed_passages_controlled_local(
984    models_dir: &Path,
985    texts: &[&str],
986    token_counts: &[usize],
987) -> Result<Vec<Vec<f32>>, AppError> {
988    let embedder = get_embedder(models_dir)?;
989    embed_passages_controlled(embedder, texts, token_counts)
990}
991
992/// G42/S3: embeds `texts` through the bounded parallel fan-out and
993/// returns vectors in input order.
994pub fn embed_passages_parallel_local(
995    models_dir: &Path,
996    texts: &[String],
997    parallelism: usize,
998    batch_size: usize,
999) -> Result<Vec<Vec<f32>>, AppError> {
1000    let embedder = get_embedder(models_dir)?;
1001    embed_texts_parallel(embedder, texts, parallelism, batch_size)
1002}
1003
1004/// G56: in-process cache for entity embeddings keyed by `(model, text)`.
1005///
1006/// Schema v13 is immutable: `entity_embeddings` does not have a `text`
1007/// column, so a pure DB-side cache would require a schema bump. Instead
1008/// we keep a process-wide LRU-style map that survives within one CLI
1009/// invocation. The hit rate is high in `ingest` (re-embedding the same
1010/// canonical entity across thousands of memories) and modest in `remember`
1011/// (typical single-memory invocations).
1012///
1013/// Key: `blake3(model || "\0" || text)`. Value: `Arc<Vec<f32>>` so the
1014/// collector can drop the map entry while a `Vec` is still in flight.
1015type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1016
1017static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1018
1019fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1020    ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1021}
1022
1023fn entity_cache_key(model: &str, text: &str) -> u64 {
1024    let mut hasher = blake3::Hasher::new();
1025    hasher.update(model.as_bytes());
1026    hasher.update(b"\0");
1027    hasher.update(text.as_bytes());
1028    let h = hasher.finalize();
1029    let bytes = h.as_bytes();
1030    u64::from_le_bytes([
1031        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1032    ])
1033}
1034
1035/// G56: embeds entity-name texts through a process-wide cache.
1036///
1037/// Skips any `(model, text)` pair already produced in this CLI invocation
1038/// and only spawns subprocesses for the cache misses. Returns vectors in
1039/// the same order as `texts`.
1040///
1041/// Designed for entity-name batches (short texts). For chunk embeds use
1042/// [`embed_passages_parallel_local`] directly — chunks are unique per
1043/// memory and cache hit rate is negligible.
1044pub fn embed_entity_texts_cached(
1045    models_dir: &Path,
1046    texts: &[String],
1047    parallelism: usize,
1048) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1049    if texts.is_empty() {
1050        return Ok((Vec::new(), EmbedCacheStats::default()));
1051    }
1052    let embedder = get_embedder(models_dir)?;
1053    let model = embedder.lock().model_label();
1054    let cache = entity_embed_cache();
1055    let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1056    let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1057    {
1058        let guard = cache.lock();
1059        for (i, text) in texts.iter().enumerate() {
1060            let key = entity_cache_key(&model, text);
1061            if let Some(v) = guard.get(&key) {
1062                hits[i] = Some(Arc::clone(v));
1063            } else {
1064                miss_indices.push(i);
1065            }
1066        }
1067    }
1068    let miss_count = miss_indices.len();
1069    if miss_count > 0 {
1070        let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1071        let miss_vecs = embed_texts_parallel(
1072            embedder,
1073            &miss_texts,
1074            parallelism,
1075            entity_embed_batch_size(),
1076        )?;
1077        let mut guard = cache.lock();
1078        for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1079            let vec = Arc::new(miss_vecs[slot].clone());
1080            let key = entity_cache_key(&model, &texts[orig_idx]);
1081            guard.insert(key, Arc::clone(&vec));
1082            hits[orig_idx] = Some(vec);
1083        }
1084    }
1085    let mut out = Vec::with_capacity(texts.len());
1086    for hit in hits.into_iter() {
1087        let v = hit.ok_or_else(|| {
1088            AppError::Embedding("entity embed cache produced null result".to_string())
1089        })?;
1090        out.push((*v).clone());
1091    }
1092    Ok((
1093        out,
1094        EmbedCacheStats {
1095            requested: texts.len(),
1096            hits: texts.len() - miss_count,
1097            misses: miss_count,
1098        },
1099    ))
1100}
1101
1102/// G56: stats snapshot returned by [`embed_entity_texts_cached`].
1103#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1104pub struct EmbedCacheStats {
1105    pub requested: usize,
1106    pub hits: usize,
1107    pub misses: usize,
1108}
1109
1110impl EmbedCacheStats {
1111    /// Hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 when nothing was requested.
1112    pub fn hit_rate(&self) -> f64 {
1113        if self.requested == 0 {
1114            0.0
1115        } else {
1116            self.hits as f64 / self.requested as f64
1117        }
1118    }
1119}
1120
1121/// G42/S3 core: bounded parallel batch embedding.
1122///
1123/// - texts are grouped into batches of `batch_size` (one LLM call per
1124///   batch, G42/S2);
1125/// - at most `effective_permits(parallelism)` LLM subprocesses run
1126///   simultaneously (`Arc<Semaphore>` + `acquire_owned`, BLOCO 2);
1127/// - results stream through a BOUNDED mpsc channel so the caller-side
1128///   collector applies backpressure and can persist incrementally
1129///   (BLOCO 5);
1130/// - the global `CancellationToken` aborts in-flight work on the first
1131///   signal; subprocesses die with their futures via `kill_on_drop`
1132///   (BLOCO 6).
1133pub fn embed_texts_parallel(
1134    embedder: &Mutex<LlmEmbedding>,
1135    texts: &[String],
1136    parallelism: usize,
1137    batch_size: usize,
1138) -> Result<Vec<Vec<f32>>, AppError> {
1139    let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1140    embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1141        slots[idx] = Some(v.to_vec());
1142        Ok(())
1143    })?;
1144    let mut out = Vec::with_capacity(slots.len());
1145    for (idx, slot) in slots.into_iter().enumerate() {
1146        out.push(slot.ok_or_else(|| {
1147            AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1148        })?);
1149    }
1150    Ok(out)
1151}
1152
1153/// Like [`embed_texts_parallel`] but invokes `on_result` as soon as each
1154/// embedding arrives (BLOCO 5: incremental persistence — a kill loses at
1155/// most the in-flight batches, never the already-delivered items).
1156pub fn embed_texts_parallel_with(
1157    embedder: &Mutex<LlmEmbedding>,
1158    texts: &[String],
1159    parallelism: usize,
1160    batch_size: usize,
1161    mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1162) -> Result<(), AppError> {
1163    if texts.is_empty() {
1164        return Ok(());
1165    }
1166    let dim = crate::constants::embedding_dim();
1167    if texts.len() == 1 {
1168        let v = embed_passage(embedder, &texts[0])?;
1169        return on_result(0, &v);
1170    }
1171
1172    let client = clone_client(embedder);
1173    let permits = effective_permits(parallelism);
1174    let batches = build_batches(texts, batch_size.max(1));
1175    let token = crate::cancel_token().clone();
1176
1177    let work = move |batch: Vec<(usize, String)>| {
1178        let client = client.clone();
1179        async move {
1180            client
1181                .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1182                .await
1183        }
1184    };
1185
1186    let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1187    match tokio::runtime::Handle::try_current() {
1188        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1189        Err(_) => shared_runtime()?.block_on(fan_out),
1190    }
1191}
1192
1193/// Groups `(global_index, text)` pairs into batches of `batch_size`.
1194fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1195    texts
1196        .iter()
1197        .cloned()
1198        .enumerate()
1199        .collect::<Vec<_>>()
1200        .chunks(batch_size)
1201        .map(|c| c.to_vec())
1202        .collect()
1203}
1204
1205/// G42/S3 BLOCO 2: effective permit count.
1206///
1207/// `permits = clamp(requested, 1, 32) ∧ cpus ∧ ram_livre*0.5/RSS` — see
1208/// the module docs for the measured RSS rationale.
1209pub fn effective_permits(requested: usize) -> usize {
1210    let cpus = std::thread::available_parallelism()
1211        .map(|n| n.get())
1212        .unwrap_or(4);
1213    let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1214        / crate::constants::LLM_WORKER_RSS_MB)
1215        .max(1) as usize;
1216    requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1217}
1218
1219/// Bounded fan-out engine. Generic over the per-batch work so the
1220/// concurrency contract is testable without spawning real LLMs.
1221///
1222/// Cancel safety (BLOCO 6/10): every task races its work against
1223/// `token.cancelled()` inside `tokio::select!`; both branches are
1224/// cancel-safe (the work future owns its subprocess via `kill_on_drop`,
1225/// and `cancelled()` is pure). On collector-side errors the `JoinSet`
1226/// is shut down, which drops in-flight futures and kills their
1227/// subprocesses.
1228async fn run_bounded<F, Fut>(
1229    batches: Vec<Vec<(usize, String)>>,
1230    permits: usize,
1231    dim: usize,
1232    token: CancellationToken,
1233    work: F,
1234    on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1235) -> Result<(), AppError>
1236where
1237    F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1238    Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1239{
1240    let total_batches = batches.len();
1241    let semaphore = Arc::new(Semaphore::new(permits));
1242    // BLOCO 5: bounded channel — producers block when the collector is
1243    // behind (backpressure); PROIBIDO unbounded_channel between stages.
1244    let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1245    let mut set: JoinSet<()> = JoinSet::new();
1246
1247    for (batch_idx, batch) in batches.into_iter().enumerate() {
1248        let sem = Arc::clone(&semaphore);
1249        let token = token.clone();
1250        let tx = tx.clone();
1251        let work = work.clone();
1252        set.spawn(async move {
1253            let wait_start = std::time::Instant::now();
1254            // acquire_owned: RAII permit moved into the task; returned
1255            // on every exit path INCLUDING panic (BLOCO 2).
1256            let Ok(_permit) = sem.acquire_owned().await else {
1257                let _ = tx
1258                    .send(Err(AppError::Embedding("semaphore closed".to_string())))
1259                    .await;
1260                return;
1261            };
1262            let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1263            let work_start = std::time::Instant::now();
1264            // ADR-0034: when `SQLITE_GRAPHRAG_IGNORE_SHUTDOWN=1` is set the
1265            // cancellation arm is dropped and the batch runs to completion.
1266            // This unblocks audit/test invocations whose `SHUTDOWN` flag was
1267            // contaminated by an earlier signal handler in the same process
1268            // tree. Production code never sees this branch.
1269            let outcome = if crate::should_obey_shutdown() {
1270                tokio::select! {
1271                    res = work(batch) => res,
1272                    _ = token.cancelled() => Err(AppError::Embedding(
1273                        "embedding cancelled by shutdown signal".to_string(),
1274                    )),
1275                }
1276            } else {
1277                work(batch).await
1278            };
1279            // BLOCO 8: permit wait time logged SEPARATELY from work time.
1280            tracing::debug!(
1281                target: "embedding",
1282                batch_idx,
1283                permit_wait_ms,
1284                work_ms = work_start.elapsed().as_millis() as u64,
1285                ok = outcome.is_ok(),
1286                "embedding batch finished"
1287            );
1288            let _ = tx.send(outcome).await;
1289        });
1290    }
1291    drop(tx);
1292
1293    let mut completed = 0usize;
1294    let mut failed = 0usize;
1295    let mut cancelled = 0usize;
1296    let mut first_error: Option<AppError> = None;
1297
1298    while let Some(message) = rx.recv().await {
1299        match message {
1300            Ok(items) => {
1301                completed += 1;
1302                if first_error.is_none() {
1303                    for (idx, v) in items {
1304                        if v.len() != dim {
1305                            first_error = Some(AppError::Embedding(format!(
1306                                "LLM returned {} dims for item {idx}, expected {dim}; \
1307                                 refusing to truncate or pad silently (G42/C5)",
1308                                v.len()
1309                            )));
1310                            break;
1311                        }
1312                        if let Err(e) = on_result(idx, &v) {
1313                            first_error = Some(e);
1314                            break;
1315                        }
1316                    }
1317                    if first_error.is_some() {
1318                        // Abort remaining work: dropped futures kill
1319                        // their subprocesses via kill_on_drop (BLOCO 6).
1320                        set.shutdown().await;
1321                    }
1322                }
1323            }
1324            Err(e) => {
1325                if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1326                    cancelled += 1;
1327                } else {
1328                    failed += 1;
1329                }
1330                if first_error.is_none() {
1331                    first_error = Some(e);
1332                    set.shutdown().await;
1333                }
1334            }
1335        }
1336    }
1337
1338    // Drain the JoinSet: surface panics distinctly (panic handling —
1339    // JoinError::is_panic tratado em todo join_next, BLOCO 9).
1340    while let Some(join_result) = set.join_next().await {
1341        if let Err(join_err) = join_result {
1342            if join_err.is_panic() {
1343                failed += 1;
1344                if first_error.is_none() {
1345                    first_error = Some(AppError::Embedding(format!(
1346                        "embedding task panicked: {join_err}"
1347                    )));
1348                }
1349            } else {
1350                cancelled += 1;
1351            }
1352        }
1353    }
1354
1355    // v1.0.85 (ADR-0043 hygiene): the fan-out summary event moved
1356    // from `tracing::info!` to `tracing::debug!` and the
1357    // `available_permits` field was removed — the user prohibited
1358    // pool-state telemetry (slot_pool_stats / slot_wait_ms) and
1359    // decorative `tracing::info!` events. The remaining counters
1360    // (total_batches / completed / failed / cancelled) describe the
1361    // progress of the operation itself, not the slot pool, and
1362    // remain visible to operators running with `RUST_LOG=debug` or
1363    // `-vvv`.
1364    tracing::debug!(
1365        target: "embedding",
1366        total_batches,
1367        completed,
1368        failed,
1369        cancelled,
1370        "embedding fan-out finished"
1371    );
1372
1373    match first_error {
1374        Some(e) => Err(e),
1375        None => Ok(()),
1376    }
1377}
1378
1379pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1380    let mut out = Vec::with_capacity(v.len() * 4);
1381    for f in v {
1382        out.extend_from_slice(&f.to_le_bytes());
1383    }
1384    out
1385}
1386
1387pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1388    let mut out = Vec::with_capacity(bytes.len() / 4);
1389    for chunk in bytes.chunks_exact(4) {
1390        out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1391    }
1392    out
1393}
1394
1395/// Returns the dimensionality of the embedding space. Used to
1396/// validate LLM responses and to size the in-memory cache.
1397pub fn embedding_dim() -> usize {
1398    crate::constants::embedding_dim()
1399}
1400
1401/// G42/C5: a vector with a divergent dimensionality is an ERROR, never
1402/// silently truncated or zero-padded (the pre-v1.0.79 `normalise_dim`
1403/// masked malformed LLM responses).
1404fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1405    let dim = crate::constants::embedding_dim();
1406    if v.len() != dim {
1407        return Err(AppError::Embedding(format!(
1408            "embedding has {} dims, expected {dim}; \
1409             refusing to truncate or pad silently (G42/C5)",
1410            v.len()
1411        )));
1412    }
1413    Ok(v)
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418    use super::*;
1419    use std::sync::atomic::{AtomicUsize, Ordering};
1420
1421    #[test]
1422    fn f32_to_bytes_roundtrip() {
1423        let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1424        let bytes = f32_to_bytes(&input);
1425        assert_eq!(bytes.len(), input.len() * 4);
1426        let out = bytes_to_f32(&bytes);
1427        assert_eq!(out, input);
1428    }
1429
1430    #[test]
1431    fn validate_dim_rejects_divergent_vectors() {
1432        // G42/C5 acceptance criterion: a divergent vector MUST fail —
1433        // never be silently normalised.
1434        let dim = crate::constants::embedding_dim();
1435        let long = vec![0.0; dim + 10];
1436        assert!(validate_dim(long).is_err(), "longer vector must error");
1437        let short = vec![0.0; dim.saturating_sub(1).max(1)];
1438        assert!(validate_dim(short).is_err(), "shorter vector must error");
1439        let exact = vec![0.0; dim];
1440        assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1441    }
1442
1443    #[test]
1444    fn embedding_dim_matches_constants_source() {
1445        assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1446    }
1447
1448    #[test]
1449    fn build_batches_preserves_global_indices() {
1450        let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1451        let batches = build_batches(&texts, 4);
1452        assert_eq!(batches.len(), 3);
1453        assert_eq!(batches[0].len(), 4);
1454        assert_eq!(batches[2].len(), 2);
1455        assert_eq!(batches[2][1].0, 9);
1456        assert_eq!(batches[2][1].1, "t9");
1457    }
1458
1459    #[test]
1460    fn effective_permits_clamps_to_bounds() {
1461        assert!(effective_permits(0) >= 1);
1462        assert!(effective_permits(1000) <= 32);
1463    }
1464
1465    fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1466        (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1467    }
1468
1469    fn dummy_vec(dim: usize) -> Vec<f32> {
1470        vec![0.0; dim]
1471    }
1472
1473    /// G42 acceptance criterion: with N permits the measured peak of
1474    /// concurrent workers NEVER exceeds N, even with 10x more batches.
1475    #[test]
1476    fn concurrency_peak_never_exceeds_permits() {
1477        let permits = 4usize;
1478        let batches = test_batches(permits * 10);
1479        let dim = crate::constants::embedding_dim();
1480        let current = Arc::new(AtomicUsize::new(0));
1481        let peak = Arc::new(AtomicUsize::new(0));
1482
1483        let current_c = Arc::clone(&current);
1484        let peak_c = Arc::clone(&peak);
1485        let work = move |batch: Vec<(usize, String)>| {
1486            let current = Arc::clone(&current_c);
1487            let peak = Arc::clone(&peak_c);
1488            async move {
1489                let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1490                peak.fetch_max(now, Ordering::SeqCst);
1491                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1492                current.fetch_sub(1, Ordering::SeqCst);
1493                Ok(batch
1494                    .into_iter()
1495                    .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1496                    .collect())
1497            }
1498        };
1499
1500        let mut delivered = 0usize;
1501        let rt = tokio::runtime::Builder::new_multi_thread()
1502            .worker_threads(4)
1503            .enable_all()
1504            .build()
1505            .expect("test runtime");
1506        rt.block_on(run_bounded(
1507            batches,
1508            permits,
1509            dim,
1510            CancellationToken::new(),
1511            work,
1512            &mut |_idx, _v| {
1513                delivered += 1;
1514                Ok(())
1515            },
1516        ))
1517        .expect("fan-out must succeed");
1518
1519        assert_eq!(delivered, permits * 10, "every item must be delivered");
1520        assert!(
1521            peak.load(Ordering::SeqCst) <= permits,
1522            "peak concurrency {} exceeded permits {permits}",
1523            peak.load(Ordering::SeqCst)
1524        );
1525    }
1526
1527    /// G42 acceptance criterion: a panicking task returns its permit via
1528    /// RAII and surfaces as JoinError::is_panic, not a hang.
1529    #[test]
1530    fn panicking_task_returns_permit_and_surfaces_error() {
1531        let permits = 2usize;
1532        let batches = test_batches(4);
1533        let dim = crate::constants::embedding_dim();
1534
1535        let work = move |batch: Vec<(usize, String)>| async move {
1536            if batch[0].0 == 1 {
1537                panic!("intentional test panic");
1538            }
1539            Ok(batch
1540                .into_iter()
1541                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1542                .collect())
1543        };
1544
1545        let rt = tokio::runtime::Builder::new_multi_thread()
1546            .worker_threads(2)
1547            .enable_all()
1548            .build()
1549            .expect("test runtime");
1550        let result = rt.block_on(run_bounded(
1551            batches,
1552            permits,
1553            dim,
1554            CancellationToken::new(),
1555            work,
1556            &mut |_idx, _v| Ok(()),
1557        ));
1558
1559        let err = result.expect_err("panic must surface as an error");
1560        assert!(
1561            err.to_string().contains("panicked"),
1562            "error must mention the panic: {err}"
1563        );
1564    }
1565
1566    /// G42 acceptance criterion: cancellation aborts in-flight work and
1567    /// the fan-out terminates within the shutdown timeout.
1568    #[test]
1569    fn cancellation_terminates_fan_out_quickly() {
1570        let permits = 2usize;
1571        let batches = test_batches(8);
1572        let dim = crate::constants::embedding_dim();
1573        let token = CancellationToken::new();
1574
1575        let work = move |batch: Vec<(usize, String)>| async move {
1576            // Long enough that only cancellation can finish the test fast.
1577            tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1578            Ok(batch
1579                .into_iter()
1580                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1581                .collect())
1582        };
1583
1584        let rt = tokio::runtime::Builder::new_multi_thread()
1585            .worker_threads(2)
1586            .enable_all()
1587            .build()
1588            .expect("test runtime");
1589        let cancel = token.clone();
1590        let start = std::time::Instant::now();
1591        let result = rt.block_on(async move {
1592            tokio::spawn(async move {
1593                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1594                cancel.cancel();
1595            });
1596            run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1597        });
1598
1599        assert!(result.is_err(), "cancelled fan-out must report an error");
1600        assert!(
1601            start.elapsed() < std::time::Duration::from_secs(10),
1602            "graceful shutdown must finish well under the work duration"
1603        );
1604    }
1605
1606    /// G42 acceptance criterion: a divergent dim coming out of the work
1607    /// stage fails the fan-out instead of being silently accepted.
1608    #[test]
1609    fn fan_out_rejects_divergent_dim() {
1610        let permits = 2usize;
1611        let batches = test_batches(2);
1612        let dim = crate::constants::embedding_dim();
1613
1614        let work = move |batch: Vec<(usize, String)>| async move {
1615            Ok(batch
1616                .into_iter()
1617                .map(|(i, _)| (i, vec![0.0f32; 3]))
1618                .collect::<Vec<(usize, Vec<f32>)>>())
1619        };
1620
1621        let rt = tokio::runtime::Builder::new_multi_thread()
1622            .worker_threads(2)
1623            .enable_all()
1624            .build()
1625            .expect("test runtime");
1626        let result = rt.block_on(run_bounded(
1627            batches,
1628            permits,
1629            dim,
1630            CancellationToken::new(),
1631            work,
1632            &mut |_idx, _v| Ok(()),
1633        ));
1634
1635        let err = result.expect_err("divergent dim must fail the fan-out");
1636        assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1637    }
1638
1639    /// G44: the calibration bases stay intact at the calibration dim.
1640    #[test]
1641    fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1642        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1643        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1644    }
1645
1646    /// G44: legacy 384-dim databases shrink to reliable batch sizes.
1647    #[test]
1648    fn adaptive_batch_dim384_shrinks() {
1649        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1650        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1651    }
1652
1653    /// G44: intermediate dims scale proportionally to the float budget.
1654    #[test]
1655    fn adaptive_batch_intermediate_dims() {
1656        assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1657        assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1658    }
1659
1660    /// G44: dims below the calibration dim never exceed the base.
1661    #[test]
1662    fn adaptive_batch_small_dim_clamps_to_base() {
1663        assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1664    }
1665
1666    /// G44: the function is total — no division by zero, no clamp panic.
1667    #[test]
1668    fn adaptive_batch_total_function() {
1669        assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1670        assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1671        assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1672    }
1673
1674    /// G44 end-to-end: the public wrappers follow the env-dim override.
1675    #[test]
1676    #[serial_test::serial(env)]
1677    fn adaptive_wrappers_follow_env_dim() {
1678        std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1679        let chunk = chunk_embed_batch_size();
1680        let entity = entity_embed_batch_size();
1681        std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1682        crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1683        assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1684        assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1685    }
1686
1687    // ---------------------------------------------------------------
1688    // G58/S1: FallbackReason + try_embed_query_with_fallback tests
1689    // ---------------------------------------------------------------
1690
1691    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps an OAuth
1692    /// error message to the OAuth variant regardless of case or
1693    /// surrounding text.
1694    #[test]
1695    fn embedding_error_kind_classify_oauth_message() {
1696        assert_eq!(
1697            EmbeddingErrorKind::classify("OAuth token expired for claude"),
1698            EmbeddingErrorKind::OAuth,
1699        );
1700        assert_eq!(
1701            EmbeddingErrorKind::classify("oauth authentication failed"),
1702            EmbeddingErrorKind::OAuth,
1703        );
1704    }
1705
1706    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a quota
1707    /// message to the Quota variant (without "OAuth" substring).
1708    #[test]
1709    fn embedding_error_kind_classify_quota_message() {
1710        assert_eq!(
1711            EmbeddingErrorKind::classify("quota exhausted on backend"),
1712            EmbeddingErrorKind::Quota,
1713        );
1714        assert_eq!(
1715            EmbeddingErrorKind::classify("Usage quota limit reached"),
1716            EmbeddingErrorKind::Quota,
1717        );
1718    }
1719
1720    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a slot-sema
1721    /// message to the SlotExhausted variant (matched BEFORE Quota so
1722    /// the more specific LLM-never-tried path wins).
1723    #[test]
1724    fn embedding_error_kind_classify_slot_exhausted_message() {
1725        assert_eq!(
1726            EmbeddingErrorKind::classify(
1727                "slot exhausted: failed to acquire LLM slot after backoff"
1728            ),
1729            EmbeddingErrorKind::SlotExhausted,
1730        );
1731    }
1732
1733    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a
1734    /// zero-dimensional vector error to the ZeroDimension variant.
1735    #[test]
1736    fn embedding_error_kind_classify_zero_dimension_message() {
1737        assert_eq!(
1738            EmbeddingErrorKind::classify("embedding returned dim=zero"),
1739            EmbeddingErrorKind::ZeroDimension,
1740        );
1741        assert_eq!(
1742            EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1743            EmbeddingErrorKind::ZeroDimension,
1744        );
1745    }
1746
1747    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify falls back to
1748    /// the Unknown variant when no marker matches, and the code()
1749    /// accessor returns the kebab-safe discriminator string.
1750    #[test]
1751    fn embedding_error_kind_classify_unknown_fallback() {
1752        assert_eq!(
1753            EmbeddingErrorKind::classify("unrelated subprocess error"),
1754            EmbeddingErrorKind::Unknown,
1755        );
1756        assert_eq!(
1757            EmbeddingErrorKind::classify("rate limit hit"),
1758            EmbeddingErrorKind::Unknown,
1759        );
1760        // code() returns the stable discriminator string.
1761        assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1762        assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1763        assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1764        assert_eq!(
1765            EmbeddingErrorKind::BackendMismatch.code(),
1766            "backend-mismatch"
1767        );
1768        assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1769        assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1770    }
1771
1772    /// Display impl covers all three variants without panicking.
1773    #[test]
1774    fn fallback_reason_display_does_not_panic() {
1775        let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1776        let _ = FallbackReason::Cancelled.to_string();
1777        let _ = FallbackReason::Timeout {
1778            operation: "embed_query".into(),
1779            duration_secs: 30,
1780        }
1781        .to_string();
1782    }
1783
1784    /// FallbackReason is PartialEq — used in test assertions to verify
1785    /// the mapping rules.
1786    #[test]
1787    fn fallback_reason_is_partial_eq() {
1788        assert_eq!(
1789            FallbackReason::EmbeddingFailed("a".into()),
1790            FallbackReason::EmbeddingFailed("a".into())
1791        );
1792        assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1793        assert_ne!(
1794            FallbackReason::EmbeddingFailed("a".into()),
1795            FallbackReason::EmbeddingFailed("b".into())
1796        );
1797        assert_ne!(
1798            FallbackReason::Cancelled,
1799            FallbackReason::Timeout {
1800                operation: "x".into(),
1801                duration_secs: 1
1802            }
1803        );
1804    }
1805
1806    /// Timeout variant preserves the operation name and duration from the
1807    /// original AppError::Timeout for observability.
1808    #[test]
1809    fn fallback_reason_timeout_preserves_fields() {
1810        let r = FallbackReason::Timeout {
1811            operation: "embed_query_local".into(),
1812            duration_secs: 300,
1813        };
1814        match r {
1815            FallbackReason::Timeout {
1816                operation,
1817                duration_secs,
1818            } => {
1819                assert_eq!(operation, "embed_query_local");
1820                assert_eq!(duration_secs, 300);
1821            }
1822            other => panic!("expected Timeout, got {other:?}"),
1823        }
1824    }
1825
1826    /// try_embed_query_with_fallback surfaces an EmbeddingFailed variant
1827    /// when the LLM subprocess errors. Uses a path that surely does not
1828    /// contain any embedder configuration (the binary is invoked as
1829    /// `codex` / `claude` via PATH which, in tests, defaults to nothing
1830    /// in scope, so `LlmEmbedding::detect_available()` returns Err).
1831    #[test]
1832    #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1833    fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1834        // Pointing at a models dir that does not exist forces the embedder
1835        // init to fail; the error is mapped to EmbeddingFailed.
1836        let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1837        let result = try_embed_query_with_fallback(bogus, "hello world");
1838        match result {
1839            Err(FallbackReason::EmbeddingFailed(msg)) => {
1840                // The original error must survive in the message for ops triage.
1841                assert!(!msg.is_empty(), "fallback message must not be empty");
1842            }
1843            Err(FallbackReason::Cancelled) => {
1844                panic!("expected EmbeddingFailed, got Cancelled");
1845            }
1846            Err(FallbackReason::Timeout { .. }) => {
1847                panic!("expected EmbeddingFailed, got Timeout");
1848            }
1849            Err(FallbackReason::SlotExhausted) => {
1850                panic!("expected EmbeddingFailed, got SlotExhausted");
1851            }
1852            Err(FallbackReason::OAuthQuota { .. }) => {
1853                panic!("expected EmbeddingFailed, got OAuthQuota");
1854            }
1855            Err(FallbackReason::BackendMismatch { .. }) => {
1856                panic!("expected EmbeddingFailed, got BackendMismatch");
1857            }
1858            Err(FallbackReason::DimZero) => {
1859                panic!("expected EmbeddingFailed, got DimZero");
1860            }
1861            Ok(_) => {
1862                panic!("expected an error, got Ok — embedder must fail for bogus path");
1863            }
1864        }
1865    }
1866
1867    // G56: entity embed cache — unit tests
1868    #[test]
1869    fn g56_entity_cache_key_is_stable_and_distinct() {
1870        let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1871        let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1872        let k3 = entity_cache_key("codex:default", "claude-code");
1873        let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1874        assert_eq!(k1, k2, "same model+text must hash identically");
1875        assert_ne!(k1, k3, "different text must hash differently");
1876        assert_ne!(k1, k4, "different model must hash differently");
1877    }
1878
1879    #[test]
1880    fn g56_entity_embed_cache_stats_hit_rate() {
1881        let zero = EmbedCacheStats::default();
1882        assert_eq!(zero.hit_rate(), 0.0);
1883        let half = EmbedCacheStats {
1884            requested: 4,
1885            hits: 2,
1886            misses: 2,
1887        };
1888        assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1889        let all = EmbedCacheStats {
1890            requested: 7,
1891            hits: 7,
1892            misses: 0,
1893        };
1894        assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1895    }
1896
1897    #[test]
1898    fn g56_entity_embed_cache_populates_and_hits() {
1899        // Manually populate the cache: bypasses the LLM by writing a
1900        // known vector under a chosen (model, text) key, then verifies
1901        // the cache is consulted before any LLM call would happen.
1902        let cache = entity_embed_cache();
1903        let model = "test-model";
1904        let text = "sqlite-graphrag";
1905        let key = entity_cache_key(model, text);
1906        let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1907        cache.lock().insert(key, Arc::clone(&stored));
1908        let guard = cache.lock();
1909        let hit = guard.get(&key).expect("cache must return stored value");
1910        assert_eq!(hit.len(), crate::constants::embedding_dim());
1911        assert!((hit[0] - 0.42).abs() < 1e-6);
1912    }
1913
1914    #[test]
1915    fn g56_empty_texts_short_circuits_with_zero_stats() {
1916        // Cannot call embed_entity_texts_cached without an LLM on PATH,
1917        // so we only verify the empty-input contract via the stats struct.
1918        let stats = EmbedCacheStats::default();
1919        assert_eq!(stats.requested, 0);
1920        assert_eq!(stats.hits, 0);
1921        assert_eq!(stats.misses, 0);
1922        assert_eq!(stats.hit_rate(), 0.0);
1923    }
1924}
1925
1926// =============================================================================
1927// v1.0.82 (GAP-005) — embed_with_fallback tests
1928// =============================================================================
1929#[cfg(test)]
1930mod embed_with_fallback_tests {
1931    use super::*;
1932    use crate::llm::exit_code_hints::LlmBackendError;
1933
1934    #[test]
1935    fn none_backend_returns_empty_vector_without_calling_llm() {
1936        // The `None` backend short-circuits to `Ok(vec![])` without
1937        // touching the LLM at all. This is the signal the caller uses
1938        // to insert a `pending_embeddings` row.
1939        let (v, kind) = embed_via_backend(
1940            std::path::Path::new("/nonexistent"),
1941            "any text",
1942            &LlmBackendKind::None,
1943        )
1944        .expect("None backend never fails");
1945        assert!(v.is_empty());
1946        assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1947    }
1948
1949    #[test]
1950    fn empty_chain_defaults_to_codex_claude_none() {
1951        // Internal invariant: the default chain order is the v1.0.81
1952        // implicit order (codex first, then claude, then None as
1953        // graceful-degradation fallback).
1954        let defaults = [
1955            LlmBackendKind::Codex,
1956            LlmBackendKind::Claude,
1957            LlmBackendKind::None,
1958        ];
1959
1960        // ---------------------------------------------------------------
1961        // ADR-0042: as_str + reason_code unit tests
1962        // ---------------------------------------------------------------
1963
1964        #[allow(dead_code)]
1965        fn llm_backend_kind_as_str_is_stable() {
1966            assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1967            assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1968            assert_eq!(LlmBackendKind::None.as_str(), "none");
1969        }
1970
1971        #[allow(dead_code)]
1972        fn fallback_reason_reason_code_is_stable() {
1973            assert_eq!(
1974                FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1975                "embedding_failed"
1976            );
1977            assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1978            assert_eq!(
1979                FallbackReason::Timeout {
1980                    operation: "embed_query".into(),
1981                    duration_secs: 30
1982                }
1983                .reason_code(),
1984                "timeout"
1985            );
1986        }
1987        assert_eq!(defaults.len(), 3);
1988    }
1989
1990    #[test]
1991    fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
1992        // ADR-0046 / BUG-11 v1.0.88: a fallback chain of only `[None]`
1993        // without `skip_on_failure=true` MUST abort with
1994        // `AppError::Embedding("no LLM backends available; fallback chain exhausted")`.
1995        //
1996        // Before BUG-11, the `None` tail returned `Ok((vec![], None))`
1997        // silently, which let `remember` persist a memory with a
1998        // zero-dimensional embedding (invisible to recall). The fix
1999        // routes the chain exhaustion through `embed_via_backend_strict`
2000        // so the caller can distinguish between "chain intentionally
2001        // degrades to skip" (skip_on_failure=true) and "chain has no
2002        // viable backend at all" (this test).
2003        let chain = vec![LlmBackendKind::None];
2004        let err = embed_with_fallback(
2005            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2006            "hello",
2007            &chain,
2008            false,
2009        )
2010        .expect_err("chain of only [None] without skip_on_failure MUST abort");
2011        let msg = format!("{err}");
2012        assert!(
2013            msg.contains("no LLM backends available"),
2014            "error must mention exhausted chain, got: {msg}"
2015        );
2016    }
2017    #[test]
2018    fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2019        // skip_on_failure=true + a chain of only `None` returns Ok(vec![])
2020        // because the None short-circuit always succeeds. This is the
2021        // canonical contract: skip_on_failure is a no-op when None is
2022        // the tail because None already provides graceful degradation.
2023        let chain = vec![LlmBackendKind::None];
2024        let v = embed_with_fallback(
2025            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2026            "hello",
2027            &chain,
2028            true,
2029        )
2030        .expect("None chain is always Ok");
2031        assert!(v.0.is_empty(), "vector must be empty");
2032        assert_eq!(v.1, LlmBackendKind::None);
2033    }
2034    #[allow(dead_code)]
2035    fn llm_backend_error_no_backends_default_message() {
2036        // The fallback chain exhaustion error must mention
2037        // in its hint so the operator knows the remediation.
2038        let e = LlmBackendError::NoBackendsAvailable;
2039        let h = e.hint();
2040        assert!(h.contains("--llm-fallback"));
2041    }
2042
2043    #[test]
2044    fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2045        let e = LlmBackendError::NonZeroExit {
2046            exit_code: Some(137),
2047            signal: Some(9),
2048            stdout_tail: "out".into(),
2049            stderr_tail: "OOM killed".into(),
2050            binary: "codex".into(),
2051            hint: "OOM".into(),
2052        };
2053        let s = e.to_string();
2054        assert!(s.contains("codex"));
2055        assert!(s.contains("OOM killed"));
2056        assert!(s.contains("signal 9") || s.contains("exit 137"));
2057    }
2058}