Skip to main content

sqlite_graphrag/
embedder.rs

1//! Embedding generation for the GraphRAG memory.
2//!
3//! v1.0.76: the default build is **LLM-only** — the binary does NOT bundle
4//! fastembed / ort / ndarray / tokenizers. All embeddings are produced
5//! by a headless invocation of `claude code` or `codex` (OAuth, no MCP,
6//! no hooks) and stored as a BLOB in `memory_embeddings(memory_id, embedding,
7//! source)`. Vector similarity is computed in pure Rust at query time.
8//!
9//! # Workload classification (G42/S3, BLOCK 1 — MANDATORY)
10//!
11//! LLM embedding is **I/O-bound + subprocess-bound**: each call waits
12//! 5-60s on a network round-trip through a headless `claude -p` /
13//! `codex exec` subprocess while the local CPU stays idle. Concurrency
14//! therefore uses **tokio** (async I/O concurrency) and NEVER rayon
15//! (reserved for CPU-bound work).
16//!
17//! # Permit formula (G42/S3, BLOCO 2)
18//!
19//! ```text
20//! permits = clamp(--llm-parallelism, 1, 32)
21//!           .min(available_parallelism())
22//!           .min(available_ram_mb * 0.5 / LLM_WORKER_RSS_MB)
23//! ```
24//!
25//! `LLM_WORKER_RSS_MB = 350` (`crate::constants`): `claude -p` and
26//! `codex exec` are node processes with a typical Maximum RSS of
27//! 200-400 MB (measured via `/usr/bin/time -l` on macOS /
28//! `/usr/bin/time -v` on Linux), so the RAM bound is pertinent.
29//!
30//! # Locking contract (G42/A3 fix)
31//!
32//! The process-wide `Mutex<LlmEmbedding>` protects ONLY the cheap clone
33//! of the client configuration (flavour + binary path + model + shared
34//! schema tempfiles). It is NEVER held across network I/O — the
35//! v1.0.76-v1.0.78 `flush_group` held it for the whole sequential
36//! embedding loop, which is why `--llm-parallelism 8` measured an
37//! effective parallelism of 1.
38
39use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49/// Process-wide LLM-embedding client behind a .
50///
51/// The lock guards configuration cloning only (see module docs); the
52/// actual LLM I/O happens on clones, outside the lock.
53///
54/// ADR-0042 / GAP-002: process-wide Claude-backed LLM-embedding client
55/// behind a `Mutex`. Distinct from `EMBEDDER` so the Claude path of
56/// `embed_via_backend` no longer re-probes PATH via `detect_available`
57/// (the v1.0.82 bug where requesting Claude could resolve to Codex).
58static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60
61/// Process-wide multi-thread tokio runtime for embedding I/O.
62///
63/// G42/A2 fix: v1.0.76-v1.0.78 built a current-thread runtime PER CALL.
64/// One runtime per process amortises the setup and hosts the bounded
65/// fan-out of `embed_texts_parallel`.
66static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
67
68/// Calibration base: chunk (long-text) batch size per LLM call at the
69/// calibration dimensionality (G42/S2). Use [`chunk_embed_batch_size`]
70/// for the dim-adaptive value (G44).
71pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
72
73/// Calibration base: entity-name (short-text) batch size per LLM call at
74/// the calibration dimensionality (G42/S2). Use [`entity_embed_batch_size`]
75/// for the dim-adaptive value (G44).
76pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
77
78/// Dimensionality the batch bases above were calibrated against (G44).
79pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
80
81/// G44: scales a calibration-base batch size to the active dimensionality,
82/// keeping the float budget per LLM call constant (~512 floats for chunks,
83/// ~1600 for entity names — the budgets empirically validated at dim 64).
84/// Fixed batches of 8 at 384 dims asked for ~3072 floats per response:
85/// claude returned partial coverage (3 of 8 items, caught by the G42/C5
86/// check) and codex timed out at 300s. `base.max(1)` keeps the function
87/// total — `clamp` panics when the upper bound is below the lower one.
88fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
89    let base = base.max(1);
90    (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
91}
92
93/// Dim-adaptive batch size for chunk (long-text) embedding calls (G44).
94pub fn chunk_embed_batch_size() -> usize {
95    let dim = crate::constants::embedding_dim();
96    let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
97    tracing::debug!(
98        dim,
99        base = CHUNK_EMBED_BATCH_SIZE,
100        batch,
101        "adaptive chunk batch size (G44)"
102    );
103    batch
104}
105
106/// Dim-adaptive batch size for entity-name (short-text) embedding calls (G44).
107pub fn entity_embed_batch_size() -> usize {
108    let dim = crate::constants::embedding_dim();
109    let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
110    tracing::debug!(
111        dim,
112        base = ENTITY_EMBED_BATCH_SIZE,
113        batch,
114        "adaptive entity batch size (G44)"
115    );
116    batch
117}
118
119/// Returns the process-wide multi-thread runtime, building it on first use.
120pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
121    if let Some(rt) = RUNTIME.get() {
122        return Ok(rt);
123    }
124    let rt = tokio::runtime::Builder::new_multi_thread()
125        .worker_threads(2)
126        .enable_all()
127        .build()
128        .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
129    let _ = RUNTIME.set(rt);
130    Ok(RUNTIME.get().expect("RUNTIME initialised above"))
131}
132
133/// Initialises the LLM-embedding client on first use and returns it.
134pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
135    if let Some(e) = EMBEDDER.get() {
136        return Ok(e);
137    }
138    let backend = LlmEmbedding::detect_available()?;
139    let _ = EMBEDDER.set(Mutex::new(backend));
140    Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
141}
142
143/// ADR-0042 / GAP-002: returns the process-wide Claude embedder, lazily
144/// initialising it on first use. Binary and model overrides come from
145/// the explicit arguments; `None` falls back to PATH/env defaults via
146/// the builder.
147pub fn get_claude_embedder(
148    claude_binary: Option<&Path>,
149    claude_model: Option<&str>,
150) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
151    if let Some(e) = CLAUDE_EMBEDDER.get() {
152        return Ok(e);
153    }
154    let mut builder = LlmEmbedding::with_claude_builder();
155    if let Some(b) = claude_binary {
156        builder = builder.override_binary(b.to_path_buf());
157    }
158    if let Some(m) = claude_model {
159        builder = builder.override_model(m.to_string());
160    }
161    let backend = builder.build()?;
162    let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
163    Ok(CLAUDE_EMBEDDER
164        .get()
165        .expect("CLAUDE_EMBEDDER initialised above"))
166}
167
168/// ADR-0042 / GAP-002: route a single passage through the Claude
169/// embedder. Used by the Claude arm of `embed_via_backend` so the
170/// fallback chain stops treating Claude as a synonym for codex.
171pub fn embed_via_claude_local(
172    _models_dir: &Path,
173    text: &str,
174    claude_binary: Option<&Path>,
175    claude_model: Option<&str>,
176) -> Result<Vec<f32>, AppError> {
177    let _slot_guard = acquire_llm_slot_for_embedding()?;
178    let embedder = get_claude_embedder(claude_binary, claude_model)?;
179    embed_passage(embedder, text)
180}
181
182/// BUG-003 / v1.0.85: split of  that also
183/// reports the resolved []. Always  because
184/// this path constructs a Claude-flavoured embedder via
185///  (no PATH probe, no silent substitution).
186pub fn embed_via_claude_local_resolved(
187    _models_dir: &Path,
188    text: &str,
189    claude_binary: Option<&Path>,
190    claude_model: Option<&str>,
191) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
192    let _slot_guard = acquire_llm_slot_for_embedding()?;
193    let embedder = get_claude_embedder(claude_binary, claude_model)?;
194    let v = embed_passage(embedder, text)?;
195    Ok((v, LlmBackendKind::Claude))
196}
197/// Clones the embedding-client configuration. The lock is held only for
198/// the duration of the clone — NEVER across I/O (G42/A3).
199fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
200    embedder.lock().clone()
201}
202
203/// Embeds a single passage for storage. Delegates to the configured LLM
204/// headless (claude code / codex). Returns a vector of the active
205/// dimensionality.
206pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
207    let client = clone_client(embedder);
208    let result = client.embed_passage(text)?;
209    validate_dim(result)
210}
211
212/// Embeds a single query for similarity search. Same model and dim as
213/// `embed_passage`; the only difference is the LLM-side prompt prefix
214/// that the headless invocation uses to disambiguate.
215pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
216    let client = clone_client(embedder);
217    let result = client.embed_query(text)?;
218    validate_dim(result)
219}
220
221/// Embeds a batch of passages with token-count-aware batching.
222///
223/// Kept for API compatibility; since v1.0.79 it routes through the
224/// bounded parallel fan-out with conservative defaults.
225pub fn embed_passages_controlled(
226    embedder: &Mutex<LlmEmbedding>,
227    texts: &[&str],
228    _token_counts: &[usize],
229) -> Result<Vec<Vec<f32>>, AppError> {
230    if texts.is_empty() {
231        return Ok(Vec::new());
232    }
233    let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
234    embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
235}
236
237pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
238    let _slot_guard = acquire_llm_slot_for_embedding()?;
239    let embedder = get_embedder(models_dir)?;
240    embed_passage(embedder, text)
241}
242
243/// v1.0.89 (BUG-SKIP-EMBED): reads `SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE`
244/// env var (set by `--skip-embedding-on-failure` via main.rs propagation).
245/// Returns `true` when the user opted to persist with NULL embedding on failure.
246pub fn should_skip_embedding_on_failure() -> bool {
247    matches!(
248        std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
249        Ok("1") | Ok("true")
250    )
251}
252
253/// v1.0.89 (BUG-SKIP-EMBED + GAP-EMBED-PROPAGATION): embed a passage
254/// honouring both `--llm-backend` and `--skip-embedding-on-failure`.
255///
256/// On success returns `Ok(Some(vec))`. On failure:
257/// - if `--skip-embedding-on-failure` is active, logs a warning and returns `Ok(None)`
258/// - otherwise propagates the error (exit 11)
259pub fn embed_passage_or_skip(
260    models_dir: &Path,
261    text: &str,
262    choice: Option<crate::cli::LlmBackendChoice>,
263) -> Result<Option<Vec<f32>>, AppError> {
264    match embed_passage_with_choice(models_dir, text, choice) {
265        Ok((v, _backend)) => Ok(Some(v)),
266        Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
267        Err(e) => {
268            if should_skip_embedding_on_failure() {
269                tracing::warn!(
270                    error = %e,
271                    "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
272                );
273                Ok(None)
274            } else {
275                Err(e)
276            }
277        }
278    }
279}
280
281/// BUG-003 / v1.0.85: split of `embed_passage_local` that reports the
282/// resolved [`LlmBackendKind`] based on the ACTUAL
283/// [`LlmEmbedding::flavour`] of the embedder constructed. When
284/// `LlmEmbedding::detect_available` substitutes claude for a missing
285/// codex, the operator sees the truth in `envelope.backend_invoked`.
286pub fn embed_passage_local_resolved(
287    models_dir: &Path,
288    text: &str,
289) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
290    let _slot_guard = acquire_llm_slot_for_embedding()?;
291    let embedder = get_embedder(models_dir)?;
292    let v = embed_passage(embedder, text)?;
293    let kind = match embedder.lock().flavour() {
294        crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
295        crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
296    };
297    Ok((v, kind))
298}
299
300pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
301    let _slot_guard = acquire_llm_slot_for_embedding()?;
302    let embedder = get_embedder(models_dir)?;
303    embed_query(embedder, text)
304}
305
306// =============================================================================
307// v1.0.82 (GAP-003): wrappers que aceitam a escolha do CLI
308// (`crate::cli::LlmBackendChoice`) e a traduzem em uma chain para
309// `embed_with_fallback`. Centralizam a propagação do flag `--llm-backend`
310// nos 6 comandos que produzem embedding (`remember`, `edit`, `ingest`,
311// `enrich`, `recall`, `hybrid-search`).
312// =============================================================================
313
314/// Embed a single passage using the LLM backend selected by the user via
315/// `--llm-backend`. Routes to `embed_with_fallback` so failures fall
316/// through to the next backend in the chain before giving up.
317///
318/// When `choice` is `None` (e.g. a sub-command that does not yet
319/// expose the flag), behaviour matches `embed_passage_local` — the
320/// active embedder from `LlmEmbedding::detect_available` decides the
321/// backend.
322pub fn embed_passage_with_choice(
323    models_dir: &Path,
324    text: &str,
325    choice: Option<crate::cli::LlmBackendChoice>,
326) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
327    let _slot_guard = acquire_llm_slot_for_embedding()?;
328    match choice {
329        None => {
330            let embedder = get_embedder(models_dir)?;
331            embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
332        }
333        Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
334    }
335}
336/// failure, returns a structured `FallbackReason` so the caller can
337/// surface `vec_degraded` instead of a hard exit 11.
338///
339/// `None` matches the legacy `try_embed_query_with_fallback` path
340/// (uses the active embedder without an explicit chain).
341pub fn try_embed_query_with_choice(
342    models_dir: &Path,
343    text: &str,
344    choice: Option<crate::cli::LlmBackendChoice>,
345) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
346    match embed_passage_with_choice(models_dir, text, choice) {
347        // GAP-004 / v1.0.85.1: when the chain terminates on
348        //  (i.e. user passed
349        // or every preceding backend failed),  returns
350        //  instead of an error. Without this guard the
351        // empty vector would propagate to  which
352        // aborts with exit 11 ("embedding has 0 dims, expected 64").
353        // The caller's contract is to surface a typed
354        // so  and  can route to FTS5-puro via
355        // the existing  /  envelope.
356        // Intercept the empty-vector success path and surface it as
357        //  (introduced at v1.0.85 / ADR-0043
358        // for the symmetric LLM-returned-zero-dim case).
359        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
360        Ok((v, backend)) => Ok((v, backend)),
361        Err(e) => Err(classify_embedding_error(e)),
362    }
363}
364/// call. Reads the max-concurrency from
365/// `SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY` (default derived from
366/// `LLM_WORKER_RSS_MB` and available memory), and the wait timeout
367/// from `SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS` (default 30s).
368///
369/// Returns `Ok(guard)` for happy path, `AppError::LockBusy` (exit 75)
370/// when no slot is available within the wait window, and
371/// `AppError::Validation` when the concurrency is 0.
372///
373/// The `LLM_SLOT_NO_WAIT` env var (or its CLI flag equivalent) sets
374/// `wait_secs = 0` to fail fast in tests.
375fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
376    use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
377    let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
378        .ok()
379        .and_then(|s| s.parse::<u32>().ok())
380        .filter(|n| *n >= 1)
381        .unwrap_or_else(crate::llm_slots::default_max_concurrency);
382    let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
383        0
384    } else {
385        std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
386            .ok()
387            .and_then(|s| s.parse::<u64>().ok())
388            .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
389    };
390    let _ = LLM_WORKER_RSS_MB; // silence the unused import (used in default_max_concurrency)
391                               // GAP-003 / ADR-0043: when the slot semaphore is contended beyond the
392                               // backoff window (50 + 100 + 200 + 400 = 750ms total), return a
393                               // marker message that `classify_embedding_error` maps to
394                               // `FallbackReason::SlotExhausted` (discriminator `slot_exhausted`).
395                               // The window is shorter than the legacy 30s timeout, so the operator
396                               // observes FTS5-puro fallback quickly instead of after 30s of silence.
397    match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
398        Ok(guard) => Ok(guard),
399        Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
400            "slot exhausted: {e} (fall back to FTS5)"
401        ))),
402        Err(e) => Err(e),
403    }
404}
405/// GAP-004 (v1.0.88): typed classifier for embedding error messages.
406///
407/// Decomposes the legacy `AppError::Embedding(String)` payload into a
408/// small enum so the call sites can branch on the cause instead of
409/// repeating `msg.contains(...)` literals. The classification is purely
410/// lexical (case-insensitive substring match on the error message) — no
411/// I/O, no retries, no telemetry, deterministic and safe under
412/// `#[serial_test::serial(env)]`.
413///
414/// 6 variants cover the 5 known discriminators from v1.0.85 (ADR-0043)
415/// plus an `Unknown` fallback for messages that do not match any marker.
416#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum EmbeddingErrorKind {
418    /// OAuth token expired or absent; no backend can authenticate.
419    OAuth,
420    /// OAuth usage quota exhausted on the named backend.
421    Quota,
422    /// LLM slot semaphore exhausted after the backoff window.
423    SlotExhausted,
424    /// User-requested backend differs from the one that actually executed.
425    BackendMismatch,
426    /// Embedding returned a zero-dimensional vector (structural bug).
427    ZeroDimension,
428    /// Message did not match any of the 5 markers above.
429    Unknown,
430}
431
432impl EmbeddingErrorKind {
433    /// Classify an embedding error message into a typed kind.
434    ///
435    /// Order of checks matters: `OAuth` is matched before `Quota` because
436    /// both substrings can co-occur in the same message. `SlotExhausted`
437    /// is checked before `Quota` because the slot-sema path is more
438    /// specific (the LLM never even tried to authenticate). The checks
439    /// are case-insensitive so `OAuth` and `oauth` both classify to
440    /// `EmbeddingErrorKind::OAuth`.
441    pub fn classify(msg: &str) -> Self {
442        let m = msg.to_lowercase();
443        if m.contains("oauth") {
444            Self::OAuth
445        } else if m.contains("quota") {
446            Self::Quota
447        } else if m.contains("slot exhausted") {
448            Self::SlotExhausted
449        } else if m.contains("backend mismatch") {
450            Self::BackendMismatch
451        } else if m.contains("dim") && m.contains("zero") {
452            Self::ZeroDimension
453        } else {
454            Self::Unknown
455        }
456    }
457
458    /// Stable, machine-friendly discriminator code (lowercase, kebab-safe).
459    pub fn code(&self) -> &'static str {
460        match self {
461            Self::OAuth => "oauth",
462            Self::Quota => "quota",
463            Self::SlotExhausted => "slot-exhausted",
464            Self::BackendMismatch => "backend-mismatch",
465            Self::ZeroDimension => "zero-dimension",
466            Self::Unknown => "unknown",
467        }
468    }
469}
470
471impl std::fmt::Display for EmbeddingErrorKind {
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        f.write_str(self.code())
474    }
475}
476
477/// G58/S1: reason an embedding call could not be completed and the caller
478/// must fall back to a non-vector retrieval path (FTS5 prefix + LIKE).
479///
480/// Returned by [`try_embed_query_with_fallback`] so the `recall` and
481/// `hybrid-search` handlers can surface a structured `vec_degraded` /
482/// `warning` envelope instead of a hard `AppError::Embedding` exit 11.
483#[derive(Debug, Clone, PartialEq)]
484pub enum FallbackReason {
485    /// The LLM subprocess failed (rate limit, OAuth contention, quota
486    /// exhausted, model unparsable response, divergent dim, etc.).
487    /// Carries the original error message for observability.
488    EmbeddingFailed(String),
489    /// The LLM slot semaphore was exhausted: 8+ concurrent LLM
490    /// subprocesses blocked the acquire beyond the backoff window
491    /// (50ms + 100ms + 200ms + 400ms = 750ms total). Resolved at v1.0.85
492    /// (GAP-003 / ADR-0043).
493    SlotExhausted,
494    /// OAuth usage quota exhausted on the named backend. The caller
495    /// should retry with an alternative backend (codex ↔ claude)
496    /// before falling back to FTS5-puro.
497    OAuthQuota { backend: &'static str },
498    /// The user requested a backend that differs from the one that
499    /// actually executed the embedding (legacy "synonym for codex"
500    /// bug from v1.0.83). Resolved at v1.0.84 (GAP-002).
501    BackendMismatch {
502        requested: &'static str,
503        resolved: &'static str,
504    },
505    /// The embedding returned a zero-dimensional vector, signalling a
506    /// structural bug (the LLM did not produce any floats). Distinct
507    /// from OAuthQuota (quota exhausted) and EmbeddingFailed
508    /// (subprocess error).
509    DimZero,
510    /// The embedding was cancelled by an external signal (SIGTERM, etc.).
511    Cancelled,
512    /// The embedding exceeded its time budget. Carries the operation name
513    /// and the elapsed seconds for diagnostic logging.
514    Timeout {
515        operation: String,
516        duration_secs: u64,
517    },
518}
519
520impl FallbackReason {
521    /// Stable, machine-friendly reason code used by JSON envelopes
522    /// (`vec_degraded_reason`). Mirrors the v1.0.84 contract extended
523    /// at v1.0.85 with 4 new variants (GAP-003 / ADR-0043).
524    pub fn reason_code(&self) -> &'static str {
525        match self {
526            Self::EmbeddingFailed(_) => "embedding_failed",
527            Self::SlotExhausted => "slot_exhausted",
528            Self::OAuthQuota { .. } => "oauth_quota",
529            Self::BackendMismatch { .. } => "backend_mismatch",
530            Self::DimZero => "dim_zero",
531            Self::Cancelled => "cancelled",
532            Self::Timeout { .. } => "timeout",
533        }
534    }
535}
536
537impl std::fmt::Display for FallbackReason {
538    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539        match self {
540            Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
541            Self::SlotExhausted => write!(
542                f,
543                "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
544            ),
545            Self::OAuthQuota { backend } => {
546                write!(f, "OAuth usage quota exhausted on backend '{backend}'")
547            }
548            Self::BackendMismatch {
549                requested,
550                resolved,
551            } => {
552                write!(
553                    f,
554                    "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
555                )
556            }
557            Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
558            Self::Cancelled => write!(f, "embedding cancelled by external signal"),
559            Self::Timeout {
560                operation,
561                duration_secs,
562            } => {
563                write!(
564                    f,
565                    "embedding timed out after {duration_secs}s during {operation}"
566                )
567            }
568        }
569    }
570}
571
572impl std::error::Error for FallbackReason {}
573
574/// G58/S1: try to embed a query, mapping any failure to a structured
575/// [`FallbackReason`] so callers can route to FTS5 + LIKE fallback instead
576/// of returning exit 11 to the user.
577///
578/// This is the bridge between the hard-fail `embed_query_local` (used by
579/// write paths where embedding failure aborts the operation) and the
580/// graceful-degradation contract of `recall` / `hybrid-search` in v1.0.80.
581pub fn try_embed_query_with_fallback(
582    models_dir: &Path,
583    query: &str,
584) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
585    match embed_query_local(models_dir, query) {
586        Ok(v) => Ok((v, LlmBackendKind::None)),
587        Err(e) => Err(classify_embedding_error(e)),
588    }
589}
590
591/// G58 / ADR-0043 (v1.0.85): deterministic fallback for `recall` and
592/// `hybrid-search`.
593///
594/// - On `OAuthQuota { backend }`, retry once with the alternative backend
595///   (codex ↔ claude) before giving up.
596/// - On `SlotExhausted`, sleep 750ms and retry once (gives the slot
597///   semaphore time to release a permit from a sibling subprocess).
598/// - On any other `FallbackReason`, return immediately (deterministic).
599pub fn try_embed_query_with_deterministic_fallback(
600    models_dir: &Path,
601    query: &str,
602    choice: Option<crate::cli::LlmBackendChoice>,
603) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
604    match try_embed_query_with_choice(models_dir, query, choice) {
605        Ok(t) => Ok(t),
606        Err(reason @ FallbackReason::OAuthQuota { backend }) => {
607            let alt = match backend {
608                "codex" => Some(crate::cli::LlmBackendChoice::Claude),
609                "claude" => Some(crate::cli::LlmBackendChoice::Codex),
610                _ => None,
611            };
612            if let Some(alt_choice) = alt {
613                try_embed_query_with_choice(models_dir, query, Some(alt_choice))
614            } else {
615                Err(reason)
616            }
617        }
618        Err(reason @ FallbackReason::SlotExhausted) => {
619            std::thread::sleep(std::time::Duration::from_millis(750));
620            try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
621        }
622        Err(other) => Err(other),
623    }
624}
625
626/// Classify an embedding [`AppError`] into a typed [`FallbackReason`].
627///
628/// v1.0.85 (ADR-0043): discriminates the 4 new causes (SlotExhausted,
629/// OAuthQuota, BackendMismatch, DimZero) from the legacy generic
630/// EmbeddingFailed bucket. The classification is purely lexical
631/// (substring match on the message) — no I/O, no retries, no
632/// telemetry, deterministic and `#[serial_test::serial(env)]`-safe.
633pub fn classify_embedding_error(err: AppError) -> FallbackReason {
634    match err {
635        AppError::Timeout {
636            operation,
637            duration_secs,
638        } => FallbackReason::Timeout {
639            operation,
640            duration_secs,
641        },
642        AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
643            // GAP-004 (v1.0.88): typed-discriminator dispatch.
644            // The lexical classifier picks the discriminator; the arms below
645            // enrich the result with the backend name and the
646            // requested/resolved pair that the JSON envelope needs.
647            //
648            // Note: `Cancelled` and `EmbeddingFailed(msg)` are not in the
649            // 6-variant enum (they have no lexical marker) so we keep them
650            // as explicit guards at the head of the match.
651            EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
652            EmbeddingErrorKind::OAuth => {
653                let backend = if msg.contains("codex") {
654                    "codex"
655                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
656                    // G45-CR5: anthropic-ratelimit-* headers are emitted only by
657                    // the Claude CLI subprocess; treat them as claude quota
658                    // signals even when the message text omits the word
659                    // "claude" explicitly.
660                    "claude"
661                } else {
662                    "unknown"
663                };
664                FallbackReason::OAuthQuota { backend }
665            }
666            EmbeddingErrorKind::Quota => {
667                let backend = if msg.contains("codex") {
668                    "codex"
669                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
670                    "claude"
671                } else {
672                    "unknown"
673                };
674                FallbackReason::OAuthQuota { backend }
675            }
676            EmbeddingErrorKind::BackendMismatch => {
677                // The `msg.contains("claude")` arm is intentionally
678                // placed BEFORE the OAuth arm so that a backend-mismatch
679                // message that mentions both "claude" and "codex" maps to
680                // BackendMismatch (the more specific failure mode).
681                let (requested, resolved) =
682                    if msg.contains("requested claude") && msg.contains("but codex") {
683                        ("claude", "codex")
684                    } else if msg.contains("requested codex") && msg.contains("but claude") {
685                        ("codex", "claude")
686                    } else if msg.contains("requested claude") {
687                        ("claude", "unknown")
688                    } else if msg.contains("requested codex") {
689                        ("codex", "unknown")
690                    } else {
691                        ("unknown", "unknown")
692                    };
693                FallbackReason::BackendMismatch {
694                    requested,
695                    resolved,
696                }
697            }
698            EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
699            EmbeddingErrorKind::Unknown => {
700                if msg.contains("cancelled") {
701                    FallbackReason::Cancelled
702                } else {
703                    FallbackReason::EmbeddingFailed(msg)
704                }
705            }
706        },
707        e => FallbackReason::EmbeddingFailed(e.to_string()),
708    }
709}
710// backends before giving up. The chain order matches the user-supplied
711// `--llm-fallback` list (default: codex, claude, none).
712// =============================================================================
713
714/// Tries each LLM backend in `chain` in order, returning the first
715/// successful embedding. On failure, the diagnostic tail of the last
716/// error is preserved in the returned `AppError::Embedding` so the
717/// operator can see WHY every backend failed.
718///
719/// If `skip_on_failure` is `true` AND every backend fails, the function
720/// returns `Ok(Vec::new())` (an empty vector) to signal "persist
721/// without embedding" — the call site is then responsible for writing
722/// a `pending_embeddings` row that can be retried later by the
723/// `embedding retry` subcommand.
724///
725/// Defaults the chain to `[codex, claude, none]` when `chain` is
726/// empty, matching the v1.0.81 behaviour where codex was the
727/// implicit default and claude was the implicit fallback.
728pub fn embed_with_fallback(
729    models_dir: &Path,
730    text: &str,
731    chain: &[LlmBackendKind],
732    skip_on_failure: bool,
733) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
734    use crate::llm::exit_code_hints::LlmBackendError;
735    let effective: Vec<LlmBackendKind> = if chain.is_empty() {
736        vec![
737            LlmBackendKind::Codex,
738            LlmBackendKind::Claude,
739            LlmBackendKind::None,
740        ]
741    } else {
742        chain.to_vec()
743    };
744
745    let mut last_err: Option<AppError> = None;
746    for backend in &effective {
747        // BUG-003 / v1.0.85: propagar o backend REAL retornado por
748        // embed_via_backend (que pode diferir do chain position quando
749        // LlmEmbedding::detect_available substitui codex por claude).
750        // O tuple `(_, requested_kind)` é descartado — só queremos o
751        // backend resolvido na primeira posição.
752        // ADR-0046 / BUG-11 v1.0.88: use `embed_via_backend_strict` so the
753        // sentinel `None` backend propagates the last real error instead
754        // of silently degrading to `Ok((Vec::new(), None))`. This is the
755        // path that caused preflight rejections to be swallowed by the
756        // chain's default trailing `None`.
757        match embed_via_backend_strict(
758            models_dir,
759            text,
760            backend,
761            last_err.as_ref(),
762            skip_on_failure,
763        ) {
764            Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
765            Err(e) => {
766                tracing::warn!(
767                    target: "embedding",
768                    backend = ?backend,
769                    error = %e,
770                    "embed_with_fallback: backend failed, trying next"
771                );
772                last_err = Some(e);
773            }
774        }
775    }
776    if skip_on_failure {
777        // Signal "persist with no embedding" via an empty vector paired
778        // with `None` so callers know the chain exhausted without a hit.
779        // Caller is responsible for writing a `pending_embeddings` row
780        // that can be retried later by the `embedding retry` subcommand.
781        return Ok((Vec::new(), LlmBackendKind::None));
782    }
783    Err(last_err
784        .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
785}
786
787/// LLM backend kind for the fallback chain. Mirrors the CLI
788/// `--llm-backend` enum so users can pass the same value to
789/// `--llm-fallback` without translation.
790#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
791pub enum LlmBackendKind {
792    /// `codex exec` (default for v1.0.76+).
793    Codex,
794    /// `claude -p` (fallback for ChatGPT Pro OAuth unavailability).
795    Claude,
796    /// No embedding — empty vector returned.
797    None,
798}
799
800impl LlmBackendKind {
801    /// Stable string label used in tracing and JSON envelopes. The
802    /// string values are part of the public contract for `envelope.backend_invoked`.
803    pub fn as_str(self) -> &'static str {
804        match self {
805            Self::Codex => "codex",
806            Self::Claude => "claude",
807            Self::None => "none",
808        }
809    }
810}
811
812/// Embeds a single text via the given backend. Used by
813/// `embed_with_fallback` and exposed to allow direct one-shot
814/// selection without a chain.
815/// Embeds a single text via the given backend. Used by
816/// `embed_with_fallback` and exposed to allow direct one-shot
817/// selection without a chain.
818///
819/// BUG-003 / v1.0.85: returns `(Vec<f32>, LlmBackendKind)`. The
820/// second element reports the backend that ACTUALLY executed the
821/// embedding, not the chain position requested by the caller. When
822/// `LlmBackendKind::Codex` is requested but `codex` is absent from
823/// PATH, `LlmEmbedding::detect_available` substitutes claude and the
824/// tuple carries `LlmBackendKind::Claude` so the operator sees the
825/// truth in `envelope.backend_invoked`.
826pub fn embed_via_backend(
827    models_dir: &Path,
828    text: &str,
829    backend: &LlmBackendKind,
830) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
831    match backend {
832        LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
833        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
834        LlmBackendKind::Claude => {
835            // ADR-0042 / GAP-002: route Claude through its own static
836            // embedder instead of re-using the Codex path (which used
837            // to silently pick Codex if PATH ordered it first).
838            tracing::debug!(
839                target: "embedder",
840                backend = "claude",
841                "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
842            );
843            embed_via_claude_local_resolved(models_dir, text, None, None)
844        }
845    }
846}
847
848// ADR-0046 / BUG-11 v1.0.88: specialisation of `embed_via_backend` that
849// refuses to SILENTLY DEGRADE to `LlmBackendKind::None` after all real
850// backends (Codex, Claude) have failed. The previous behaviour
851// (`Ok((Vec::new(), None))`) caused the `remember` write path to persist
852// memories with zero-dimensional embeddings — breaking `recall` and
853// `hybrid-search` while returning exit 0 (BUG-11 CRITICAL).
854//
855// When `--llm-backend none` is explicitly requested (i.e. `last_err` is
856// None AND the chain was a single-element `[None]`), pass
857// `skip_on_failure = true` to `embed_with_fallback` to consume the empty
858// vector via the pending-embeddings retry queue instead of persisting
859// directly. This helper is the right hook for `remember`/`edit`/`ingest`.
860pub fn embed_via_backend_strict(
861    models_dir: &Path,
862    text: &str,
863    backend: &LlmBackendKind,
864    last_err: Option<&AppError>,
865    skip_on_failure: bool,
866) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
867    use crate::llm::exit_code_hints::LlmBackendError;
868    match backend {
869        LlmBackendKind::None => {
870            // If the caller opted into skip_on_failure AND no prior
871            // backend has recorded an error, the empty vector is
872            // intentional (chain of only [None]).
873            if skip_on_failure && last_err.is_none() {
874                Ok((Vec::new(), LlmBackendKind::None))
875            } else if last_err.is_some() {
876                // The chain reached `None` after Codex/Claude failed.
877                // Propagate the most recent error so `remember` aborts
878                // instead of persisting a memory without an embedding.
879                Err(match last_err {
880                    Some(e) => AppError::Embedding(format!("{e}")),
881                    None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
882                })
883            } else {
884                // Empty chain with no skip_on_failure — treat as a
885                // configuration error (no backends available).
886                Err(AppError::Embedding(
887                    LlmBackendError::NoBackendsAvailable.to_string(),
888                ))
889            }
890        }
891        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
892        LlmBackendKind::Claude => {
893            tracing::debug!(
894                target: "embedder",
895                backend = "claude",
896                "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
897            );
898            embed_via_claude_local_resolved(models_dir, text, None, None)
899        }
900    }
901}
902
903/// Legacy one-shot wrapper around `embed_via_backend` that discards
904/// the resolved backend. Kept for call sites that only care about
905/// the vector and ignore the executed-backend signal. New code
906/// should prefer `embed_via_backend` directly.
907pub fn embed_via_backend_legacy(
908    models_dir: &Path,
909    text: &str,
910    backend: &LlmBackendKind,
911) -> Result<Vec<f32>, AppError> {
912    embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
913}
914
915pub fn embed_passages_controlled_local(
916    models_dir: &Path,
917    texts: &[&str],
918    token_counts: &[usize],
919) -> Result<Vec<Vec<f32>>, AppError> {
920    let embedder = get_embedder(models_dir)?;
921    embed_passages_controlled(embedder, texts, token_counts)
922}
923
924/// G42/S3: embeds `texts` through the bounded parallel fan-out and
925/// returns vectors in input order.
926pub fn embed_passages_parallel_local(
927    models_dir: &Path,
928    texts: &[String],
929    parallelism: usize,
930    batch_size: usize,
931) -> Result<Vec<Vec<f32>>, AppError> {
932    let embedder = get_embedder(models_dir)?;
933    embed_texts_parallel(embedder, texts, parallelism, batch_size)
934}
935
936/// G56: in-process cache for entity embeddings keyed by `(model, text)`.
937///
938/// Schema v13 is immutable: `entity_embeddings` does not have a `text`
939/// column, so a pure DB-side cache would require a schema bump. Instead
940/// we keep a process-wide LRU-style map that survives within one CLI
941/// invocation. The hit rate is high in `ingest` (re-embedding the same
942/// canonical entity across thousands of memories) and modest in `remember`
943/// (typical single-memory invocations).
944///
945/// Key: `blake3(model || "\0" || text)`. Value: `Arc<Vec<f32>>` so the
946/// collector can drop the map entry while a `Vec` is still in flight.
947type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
948
949static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
950
951fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
952    ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
953}
954
955fn entity_cache_key(model: &str, text: &str) -> u64 {
956    let mut hasher = blake3::Hasher::new();
957    hasher.update(model.as_bytes());
958    hasher.update(b"\0");
959    hasher.update(text.as_bytes());
960    let h = hasher.finalize();
961    let bytes = h.as_bytes();
962    u64::from_le_bytes([
963        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
964    ])
965}
966
967/// G56: embeds entity-name texts through a process-wide cache.
968///
969/// Skips any `(model, text)` pair already produced in this CLI invocation
970/// and only spawns subprocesses for the cache misses. Returns vectors in
971/// the same order as `texts`.
972///
973/// Designed for entity-name batches (short texts). For chunk embeds use
974/// [`embed_passages_parallel_local`] directly — chunks are unique per
975/// memory and cache hit rate is negligible.
976pub fn embed_entity_texts_cached(
977    models_dir: &Path,
978    texts: &[String],
979    parallelism: usize,
980) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
981    if texts.is_empty() {
982        return Ok((Vec::new(), EmbedCacheStats::default()));
983    }
984    let embedder = get_embedder(models_dir)?;
985    let model = embedder.lock().model_label();
986    let cache = entity_embed_cache();
987    let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
988    let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
989    {
990        let guard = cache.lock();
991        for (i, text) in texts.iter().enumerate() {
992            let key = entity_cache_key(&model, text);
993            if let Some(v) = guard.get(&key) {
994                hits[i] = Some(Arc::clone(v));
995            } else {
996                miss_indices.push(i);
997            }
998        }
999    }
1000    let miss_count = miss_indices.len();
1001    if miss_count > 0 {
1002        let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1003        let miss_vecs = embed_texts_parallel(
1004            embedder,
1005            &miss_texts,
1006            parallelism,
1007            entity_embed_batch_size(),
1008        )?;
1009        let mut guard = cache.lock();
1010        for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1011            let vec = Arc::new(miss_vecs[slot].clone());
1012            let key = entity_cache_key(&model, &texts[orig_idx]);
1013            guard.insert(key, Arc::clone(&vec));
1014            hits[orig_idx] = Some(vec);
1015        }
1016    }
1017    let mut out = Vec::with_capacity(texts.len());
1018    for hit in hits.into_iter() {
1019        let v = hit.ok_or_else(|| {
1020            AppError::Embedding("entity embed cache produced null result".to_string())
1021        })?;
1022        out.push((*v).clone());
1023    }
1024    Ok((
1025        out,
1026        EmbedCacheStats {
1027            requested: texts.len(),
1028            hits: texts.len() - miss_count,
1029            misses: miss_count,
1030        },
1031    ))
1032}
1033
1034/// G56: stats snapshot returned by [`embed_entity_texts_cached`].
1035#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1036pub struct EmbedCacheStats {
1037    pub requested: usize,
1038    pub hits: usize,
1039    pub misses: usize,
1040}
1041
1042impl EmbedCacheStats {
1043    /// Hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 when nothing was requested.
1044    pub fn hit_rate(&self) -> f64 {
1045        if self.requested == 0 {
1046            0.0
1047        } else {
1048            self.hits as f64 / self.requested as f64
1049        }
1050    }
1051}
1052
1053/// G42/S3 core: bounded parallel batch embedding.
1054///
1055/// - texts are grouped into batches of `batch_size` (one LLM call per
1056///   batch, G42/S2);
1057/// - at most `effective_permits(parallelism)` LLM subprocesses run
1058///   simultaneously (`Arc<Semaphore>` + `acquire_owned`, BLOCO 2);
1059/// - results stream through a BOUNDED mpsc channel so the caller-side
1060///   collector applies backpressure and can persist incrementally
1061///   (BLOCO 5);
1062/// - the global `CancellationToken` aborts in-flight work on the first
1063///   signal; subprocesses die with their futures via `kill_on_drop`
1064///   (BLOCO 6).
1065pub fn embed_texts_parallel(
1066    embedder: &Mutex<LlmEmbedding>,
1067    texts: &[String],
1068    parallelism: usize,
1069    batch_size: usize,
1070) -> Result<Vec<Vec<f32>>, AppError> {
1071    let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1072    embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1073        slots[idx] = Some(v.to_vec());
1074        Ok(())
1075    })?;
1076    let mut out = Vec::with_capacity(slots.len());
1077    for (idx, slot) in slots.into_iter().enumerate() {
1078        out.push(slot.ok_or_else(|| {
1079            AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1080        })?);
1081    }
1082    Ok(out)
1083}
1084
1085/// Like [`embed_texts_parallel`] but invokes `on_result` as soon as each
1086/// embedding arrives (BLOCO 5: incremental persistence — a kill loses at
1087/// most the in-flight batches, never the already-delivered items).
1088pub fn embed_texts_parallel_with(
1089    embedder: &Mutex<LlmEmbedding>,
1090    texts: &[String],
1091    parallelism: usize,
1092    batch_size: usize,
1093    mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1094) -> Result<(), AppError> {
1095    if texts.is_empty() {
1096        return Ok(());
1097    }
1098    let dim = crate::constants::embedding_dim();
1099    if texts.len() == 1 {
1100        let v = embed_passage(embedder, &texts[0])?;
1101        return on_result(0, &v);
1102    }
1103
1104    let client = clone_client(embedder);
1105    let permits = effective_permits(parallelism);
1106    let batches = build_batches(texts, batch_size.max(1));
1107    let token = crate::cancel_token().clone();
1108
1109    let work = move |batch: Vec<(usize, String)>| {
1110        let client = client.clone();
1111        async move {
1112            client
1113                .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1114                .await
1115        }
1116    };
1117
1118    let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1119    match tokio::runtime::Handle::try_current() {
1120        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1121        Err(_) => shared_runtime()?.block_on(fan_out),
1122    }
1123}
1124
1125/// Groups `(global_index, text)` pairs into batches of `batch_size`.
1126fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1127    texts
1128        .iter()
1129        .cloned()
1130        .enumerate()
1131        .collect::<Vec<_>>()
1132        .chunks(batch_size)
1133        .map(|c| c.to_vec())
1134        .collect()
1135}
1136
1137/// G42/S3 BLOCO 2: effective permit count.
1138///
1139/// `permits = clamp(requested, 1, 32) ∧ cpus ∧ ram_livre*0.5/RSS` — see
1140/// the module docs for the measured RSS rationale.
1141pub fn effective_permits(requested: usize) -> usize {
1142    let cpus = std::thread::available_parallelism()
1143        .map(|n| n.get())
1144        .unwrap_or(4);
1145    let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1146        / crate::constants::LLM_WORKER_RSS_MB)
1147        .max(1) as usize;
1148    requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1149}
1150
1151/// Bounded fan-out engine. Generic over the per-batch work so the
1152/// concurrency contract is testable without spawning real LLMs.
1153///
1154/// Cancel safety (BLOCO 6/10): every task races its work against
1155/// `token.cancelled()` inside `tokio::select!`; both branches are
1156/// cancel-safe (the work future owns its subprocess via `kill_on_drop`,
1157/// and `cancelled()` is pure). On collector-side errors the `JoinSet`
1158/// is shut down, which drops in-flight futures and kills their
1159/// subprocesses.
1160async fn run_bounded<F, Fut>(
1161    batches: Vec<Vec<(usize, String)>>,
1162    permits: usize,
1163    dim: usize,
1164    token: CancellationToken,
1165    work: F,
1166    on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1167) -> Result<(), AppError>
1168where
1169    F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1170    Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1171{
1172    let total_batches = batches.len();
1173    let semaphore = Arc::new(Semaphore::new(permits));
1174    // BLOCO 5: bounded channel — producers block when the collector is
1175    // behind (backpressure); PROIBIDO unbounded_channel between stages.
1176    let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1177    let mut set: JoinSet<()> = JoinSet::new();
1178
1179    for (batch_idx, batch) in batches.into_iter().enumerate() {
1180        let sem = Arc::clone(&semaphore);
1181        let token = token.clone();
1182        let tx = tx.clone();
1183        let work = work.clone();
1184        set.spawn(async move {
1185            let wait_start = std::time::Instant::now();
1186            // acquire_owned: RAII permit moved into the task; returned
1187            // on every exit path INCLUDING panic (BLOCO 2).
1188            let Ok(_permit) = sem.acquire_owned().await else {
1189                let _ = tx
1190                    .send(Err(AppError::Embedding("semaphore closed".to_string())))
1191                    .await;
1192                return;
1193            };
1194            let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1195            let work_start = std::time::Instant::now();
1196            // ADR-0034: when `SQLITE_GRAPHRAG_IGNORE_SHUTDOWN=1` is set the
1197            // cancellation arm is dropped and the batch runs to completion.
1198            // This unblocks audit/test invocations whose `SHUTDOWN` flag was
1199            // contaminated by an earlier signal handler in the same process
1200            // tree. Production code never sees this branch.
1201            let outcome = if crate::should_obey_shutdown() {
1202                tokio::select! {
1203                    res = work(batch) => res,
1204                    _ = token.cancelled() => Err(AppError::Embedding(
1205                        "embedding cancelled by shutdown signal".to_string(),
1206                    )),
1207                }
1208            } else {
1209                work(batch).await
1210            };
1211            // BLOCO 8: permit wait time logged SEPARATELY from work time.
1212            tracing::debug!(
1213                target: "embedding",
1214                batch_idx,
1215                permit_wait_ms,
1216                work_ms = work_start.elapsed().as_millis() as u64,
1217                ok = outcome.is_ok(),
1218                "embedding batch finished"
1219            );
1220            let _ = tx.send(outcome).await;
1221        });
1222    }
1223    drop(tx);
1224
1225    let mut completed = 0usize;
1226    let mut failed = 0usize;
1227    let mut cancelled = 0usize;
1228    let mut first_error: Option<AppError> = None;
1229
1230    while let Some(message) = rx.recv().await {
1231        match message {
1232            Ok(items) => {
1233                completed += 1;
1234                if first_error.is_none() {
1235                    for (idx, v) in items {
1236                        if v.len() != dim {
1237                            first_error = Some(AppError::Embedding(format!(
1238                                "LLM returned {} dims for item {idx}, expected {dim}; \
1239                                 refusing to truncate or pad silently (G42/C5)",
1240                                v.len()
1241                            )));
1242                            break;
1243                        }
1244                        if let Err(e) = on_result(idx, &v) {
1245                            first_error = Some(e);
1246                            break;
1247                        }
1248                    }
1249                    if first_error.is_some() {
1250                        // Abort remaining work: dropped futures kill
1251                        // their subprocesses via kill_on_drop (BLOCO 6).
1252                        set.shutdown().await;
1253                    }
1254                }
1255            }
1256            Err(e) => {
1257                if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1258                    cancelled += 1;
1259                } else {
1260                    failed += 1;
1261                }
1262                if first_error.is_none() {
1263                    first_error = Some(e);
1264                    set.shutdown().await;
1265                }
1266            }
1267        }
1268    }
1269
1270    // Drain the JoinSet: surface panics distinctly (panic handling —
1271    // JoinError::is_panic tratado em todo join_next, BLOCO 9).
1272    while let Some(join_result) = set.join_next().await {
1273        if let Err(join_err) = join_result {
1274            if join_err.is_panic() {
1275                failed += 1;
1276                if first_error.is_none() {
1277                    first_error = Some(AppError::Embedding(format!(
1278                        "embedding task panicked: {join_err}"
1279                    )));
1280                }
1281            } else {
1282                cancelled += 1;
1283            }
1284        }
1285    }
1286
1287    // v1.0.85 (ADR-0043 hygiene): the fan-out summary event moved
1288    // from `tracing::info!` to `tracing::debug!` and the
1289    // `available_permits` field was removed — the user prohibited
1290    // pool-state telemetry (slot_pool_stats / slot_wait_ms) and
1291    // decorative `tracing::info!` events. The remaining counters
1292    // (total_batches / completed / failed / cancelled) describe the
1293    // progress of the operation itself, not the slot pool, and
1294    // remain visible to operators running with `RUST_LOG=debug` or
1295    // `-vvv`.
1296    tracing::debug!(
1297        target: "embedding",
1298        total_batches,
1299        completed,
1300        failed,
1301        cancelled,
1302        "embedding fan-out finished"
1303    );
1304
1305    match first_error {
1306        Some(e) => Err(e),
1307        None => Ok(()),
1308    }
1309}
1310
1311pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1312    let mut out = Vec::with_capacity(v.len() * 4);
1313    for f in v {
1314        out.extend_from_slice(&f.to_le_bytes());
1315    }
1316    out
1317}
1318
1319pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1320    let mut out = Vec::with_capacity(bytes.len() / 4);
1321    for chunk in bytes.chunks_exact(4) {
1322        out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1323    }
1324    out
1325}
1326
1327/// Returns the dimensionality of the embedding space. Used to
1328/// validate LLM responses and to size the in-memory cache.
1329pub fn embedding_dim() -> usize {
1330    crate::constants::embedding_dim()
1331}
1332
1333/// G42/C5: a vector with a divergent dimensionality is an ERROR, never
1334/// silently truncated or zero-padded (the pre-v1.0.79 `normalise_dim`
1335/// masked malformed LLM responses).
1336fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1337    let dim = crate::constants::embedding_dim();
1338    if v.len() != dim {
1339        return Err(AppError::Embedding(format!(
1340            "embedding has {} dims, expected {dim}; \
1341             refusing to truncate or pad silently (G42/C5)",
1342            v.len()
1343        )));
1344    }
1345    Ok(v)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350    use super::*;
1351    use std::sync::atomic::{AtomicUsize, Ordering};
1352
1353    #[test]
1354    fn f32_to_bytes_roundtrip() {
1355        let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1356        let bytes = f32_to_bytes(&input);
1357        assert_eq!(bytes.len(), input.len() * 4);
1358        let out = bytes_to_f32(&bytes);
1359        assert_eq!(out, input);
1360    }
1361
1362    #[test]
1363    fn validate_dim_rejects_divergent_vectors() {
1364        // G42/C5 acceptance criterion: a divergent vector MUST fail —
1365        // never be silently normalised.
1366        let dim = crate::constants::embedding_dim();
1367        let long = vec![0.0; dim + 10];
1368        assert!(validate_dim(long).is_err(), "longer vector must error");
1369        let short = vec![0.0; dim.saturating_sub(1).max(1)];
1370        assert!(validate_dim(short).is_err(), "shorter vector must error");
1371        let exact = vec![0.0; dim];
1372        assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1373    }
1374
1375    #[test]
1376    fn embedding_dim_matches_constants_source() {
1377        assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1378    }
1379
1380    #[test]
1381    fn build_batches_preserves_global_indices() {
1382        let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1383        let batches = build_batches(&texts, 4);
1384        assert_eq!(batches.len(), 3);
1385        assert_eq!(batches[0].len(), 4);
1386        assert_eq!(batches[2].len(), 2);
1387        assert_eq!(batches[2][1].0, 9);
1388        assert_eq!(batches[2][1].1, "t9");
1389    }
1390
1391    #[test]
1392    fn effective_permits_clamps_to_bounds() {
1393        assert!(effective_permits(0) >= 1);
1394        assert!(effective_permits(1000) <= 32);
1395    }
1396
1397    fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1398        (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1399    }
1400
1401    fn dummy_vec(dim: usize) -> Vec<f32> {
1402        vec![0.0; dim]
1403    }
1404
1405    /// G42 acceptance criterion: with N permits the measured peak of
1406    /// concurrent workers NEVER exceeds N, even with 10x more batches.
1407    #[test]
1408    fn concurrency_peak_never_exceeds_permits() {
1409        let permits = 4usize;
1410        let batches = test_batches(permits * 10);
1411        let dim = crate::constants::embedding_dim();
1412        let current = Arc::new(AtomicUsize::new(0));
1413        let peak = Arc::new(AtomicUsize::new(0));
1414
1415        let current_c = Arc::clone(&current);
1416        let peak_c = Arc::clone(&peak);
1417        let work = move |batch: Vec<(usize, String)>| {
1418            let current = Arc::clone(&current_c);
1419            let peak = Arc::clone(&peak_c);
1420            async move {
1421                let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1422                peak.fetch_max(now, Ordering::SeqCst);
1423                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1424                current.fetch_sub(1, Ordering::SeqCst);
1425                Ok(batch
1426                    .into_iter()
1427                    .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1428                    .collect())
1429            }
1430        };
1431
1432        let mut delivered = 0usize;
1433        let rt = tokio::runtime::Builder::new_multi_thread()
1434            .worker_threads(4)
1435            .enable_all()
1436            .build()
1437            .expect("test runtime");
1438        rt.block_on(run_bounded(
1439            batches,
1440            permits,
1441            dim,
1442            CancellationToken::new(),
1443            work,
1444            &mut |_idx, _v| {
1445                delivered += 1;
1446                Ok(())
1447            },
1448        ))
1449        .expect("fan-out must succeed");
1450
1451        assert_eq!(delivered, permits * 10, "every item must be delivered");
1452        assert!(
1453            peak.load(Ordering::SeqCst) <= permits,
1454            "peak concurrency {} exceeded permits {permits}",
1455            peak.load(Ordering::SeqCst)
1456        );
1457    }
1458
1459    /// G42 acceptance criterion: a panicking task returns its permit via
1460    /// RAII and surfaces as JoinError::is_panic, not a hang.
1461    #[test]
1462    fn panicking_task_returns_permit_and_surfaces_error() {
1463        let permits = 2usize;
1464        let batches = test_batches(4);
1465        let dim = crate::constants::embedding_dim();
1466
1467        let work = move |batch: Vec<(usize, String)>| async move {
1468            if batch[0].0 == 1 {
1469                panic!("intentional test panic");
1470            }
1471            Ok(batch
1472                .into_iter()
1473                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1474                .collect())
1475        };
1476
1477        let rt = tokio::runtime::Builder::new_multi_thread()
1478            .worker_threads(2)
1479            .enable_all()
1480            .build()
1481            .expect("test runtime");
1482        let result = rt.block_on(run_bounded(
1483            batches,
1484            permits,
1485            dim,
1486            CancellationToken::new(),
1487            work,
1488            &mut |_idx, _v| Ok(()),
1489        ));
1490
1491        let err = result.expect_err("panic must surface as an error");
1492        assert!(
1493            err.to_string().contains("panicked"),
1494            "error must mention the panic: {err}"
1495        );
1496    }
1497
1498    /// G42 acceptance criterion: cancellation aborts in-flight work and
1499    /// the fan-out terminates within the shutdown timeout.
1500    #[test]
1501    fn cancellation_terminates_fan_out_quickly() {
1502        let permits = 2usize;
1503        let batches = test_batches(8);
1504        let dim = crate::constants::embedding_dim();
1505        let token = CancellationToken::new();
1506
1507        let work = move |batch: Vec<(usize, String)>| async move {
1508            // Long enough that only cancellation can finish the test fast.
1509            tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1510            Ok(batch
1511                .into_iter()
1512                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1513                .collect())
1514        };
1515
1516        let rt = tokio::runtime::Builder::new_multi_thread()
1517            .worker_threads(2)
1518            .enable_all()
1519            .build()
1520            .expect("test runtime");
1521        let cancel = token.clone();
1522        let start = std::time::Instant::now();
1523        let result = rt.block_on(async move {
1524            tokio::spawn(async move {
1525                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1526                cancel.cancel();
1527            });
1528            run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1529        });
1530
1531        assert!(result.is_err(), "cancelled fan-out must report an error");
1532        assert!(
1533            start.elapsed() < std::time::Duration::from_secs(10),
1534            "graceful shutdown must finish well under the work duration"
1535        );
1536    }
1537
1538    /// G42 acceptance criterion: a divergent dim coming out of the work
1539    /// stage fails the fan-out instead of being silently accepted.
1540    #[test]
1541    fn fan_out_rejects_divergent_dim() {
1542        let permits = 2usize;
1543        let batches = test_batches(2);
1544        let dim = crate::constants::embedding_dim();
1545
1546        let work = move |batch: Vec<(usize, String)>| async move {
1547            Ok(batch
1548                .into_iter()
1549                .map(|(i, _)| (i, vec![0.0f32; 3]))
1550                .collect::<Vec<(usize, Vec<f32>)>>())
1551        };
1552
1553        let rt = tokio::runtime::Builder::new_multi_thread()
1554            .worker_threads(2)
1555            .enable_all()
1556            .build()
1557            .expect("test runtime");
1558        let result = rt.block_on(run_bounded(
1559            batches,
1560            permits,
1561            dim,
1562            CancellationToken::new(),
1563            work,
1564            &mut |_idx, _v| Ok(()),
1565        ));
1566
1567        let err = result.expect_err("divergent dim must fail the fan-out");
1568        assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1569    }
1570
1571    /// G44: the calibration bases stay intact at the calibration dim.
1572    #[test]
1573    fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1574        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1575        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1576    }
1577
1578    /// G44: legacy 384-dim databases shrink to reliable batch sizes.
1579    #[test]
1580    fn adaptive_batch_dim384_shrinks() {
1581        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1582        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1583    }
1584
1585    /// G44: intermediate dims scale proportionally to the float budget.
1586    #[test]
1587    fn adaptive_batch_intermediate_dims() {
1588        assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1589        assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1590    }
1591
1592    /// G44: dims below the calibration dim never exceed the base.
1593    #[test]
1594    fn adaptive_batch_small_dim_clamps_to_base() {
1595        assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1596    }
1597
1598    /// G44: the function is total — no division by zero, no clamp panic.
1599    #[test]
1600    fn adaptive_batch_total_function() {
1601        assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1602        assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1603        assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1604    }
1605
1606    /// G44 end-to-end: the public wrappers follow the env-dim override.
1607    #[test]
1608    #[serial_test::serial(env)]
1609    fn adaptive_wrappers_follow_env_dim() {
1610        std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1611        let chunk = chunk_embed_batch_size();
1612        let entity = entity_embed_batch_size();
1613        std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1614        crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1615        assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1616        assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1617    }
1618
1619    // ---------------------------------------------------------------
1620    // G58/S1: FallbackReason + try_embed_query_with_fallback tests
1621    // ---------------------------------------------------------------
1622
1623    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps an OAuth
1624    /// error message to the OAuth variant regardless of case or
1625    /// surrounding text.
1626    #[test]
1627    fn embedding_error_kind_classify_oauth_message() {
1628        assert_eq!(
1629            EmbeddingErrorKind::classify("OAuth token expired for claude"),
1630            EmbeddingErrorKind::OAuth,
1631        );
1632        assert_eq!(
1633            EmbeddingErrorKind::classify("oauth authentication failed"),
1634            EmbeddingErrorKind::OAuth,
1635        );
1636    }
1637
1638    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a quota
1639    /// message to the Quota variant (without "OAuth" substring).
1640    #[test]
1641    fn embedding_error_kind_classify_quota_message() {
1642        assert_eq!(
1643            EmbeddingErrorKind::classify("quota exhausted on backend"),
1644            EmbeddingErrorKind::Quota,
1645        );
1646        assert_eq!(
1647            EmbeddingErrorKind::classify("Usage quota limit reached"),
1648            EmbeddingErrorKind::Quota,
1649        );
1650    }
1651
1652    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a slot-sema
1653    /// message to the SlotExhausted variant (matched BEFORE Quota so
1654    /// the more specific LLM-never-tried path wins).
1655    #[test]
1656    fn embedding_error_kind_classify_slot_exhausted_message() {
1657        assert_eq!(
1658            EmbeddingErrorKind::classify(
1659                "slot exhausted: failed to acquire LLM slot after backoff"
1660            ),
1661            EmbeddingErrorKind::SlotExhausted,
1662        );
1663    }
1664
1665    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a
1666    /// zero-dimensional vector error to the ZeroDimension variant.
1667    #[test]
1668    fn embedding_error_kind_classify_zero_dimension_message() {
1669        assert_eq!(
1670            EmbeddingErrorKind::classify("embedding returned dim=zero"),
1671            EmbeddingErrorKind::ZeroDimension,
1672        );
1673        assert_eq!(
1674            EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1675            EmbeddingErrorKind::ZeroDimension,
1676        );
1677    }
1678
1679    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify falls back to
1680    /// the Unknown variant when no marker matches, and the code()
1681    /// accessor returns the kebab-safe discriminator string.
1682    #[test]
1683    fn embedding_error_kind_classify_unknown_fallback() {
1684        assert_eq!(
1685            EmbeddingErrorKind::classify("unrelated subprocess error"),
1686            EmbeddingErrorKind::Unknown,
1687        );
1688        assert_eq!(
1689            EmbeddingErrorKind::classify("rate limit hit"),
1690            EmbeddingErrorKind::Unknown,
1691        );
1692        // code() returns the stable discriminator string.
1693        assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1694        assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1695        assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1696        assert_eq!(
1697            EmbeddingErrorKind::BackendMismatch.code(),
1698            "backend-mismatch"
1699        );
1700        assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1701        assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1702    }
1703
1704    /// Display impl covers all three variants without panicking.
1705    #[test]
1706    fn fallback_reason_display_does_not_panic() {
1707        let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1708        let _ = FallbackReason::Cancelled.to_string();
1709        let _ = FallbackReason::Timeout {
1710            operation: "embed_query".into(),
1711            duration_secs: 30,
1712        }
1713        .to_string();
1714    }
1715
1716    /// FallbackReason is PartialEq — used in test assertions to verify
1717    /// the mapping rules.
1718    #[test]
1719    fn fallback_reason_is_partial_eq() {
1720        assert_eq!(
1721            FallbackReason::EmbeddingFailed("a".into()),
1722            FallbackReason::EmbeddingFailed("a".into())
1723        );
1724        assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1725        assert_ne!(
1726            FallbackReason::EmbeddingFailed("a".into()),
1727            FallbackReason::EmbeddingFailed("b".into())
1728        );
1729        assert_ne!(
1730            FallbackReason::Cancelled,
1731            FallbackReason::Timeout {
1732                operation: "x".into(),
1733                duration_secs: 1
1734            }
1735        );
1736    }
1737
1738    /// Timeout variant preserves the operation name and duration from the
1739    /// original AppError::Timeout for observability.
1740    #[test]
1741    fn fallback_reason_timeout_preserves_fields() {
1742        let r = FallbackReason::Timeout {
1743            operation: "embed_query_local".into(),
1744            duration_secs: 300,
1745        };
1746        match r {
1747            FallbackReason::Timeout {
1748                operation,
1749                duration_secs,
1750            } => {
1751                assert_eq!(operation, "embed_query_local");
1752                assert_eq!(duration_secs, 300);
1753            }
1754            other => panic!("expected Timeout, got {other:?}"),
1755        }
1756    }
1757
1758    /// try_embed_query_with_fallback surfaces an EmbeddingFailed variant
1759    /// when the LLM subprocess errors. Uses a path that surely does not
1760    /// contain any embedder configuration (the binary is invoked as
1761    /// `codex` / `claude` via PATH which, in tests, defaults to nothing
1762    /// in scope, so `LlmEmbedding::detect_available()` returns Err).
1763    #[test]
1764    #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1765    fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1766        // Pointing at a models dir that does not exist forces the embedder
1767        // init to fail; the error is mapped to EmbeddingFailed.
1768        let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1769        let result = try_embed_query_with_fallback(bogus, "hello world");
1770        match result {
1771            Err(FallbackReason::EmbeddingFailed(msg)) => {
1772                // The original error must survive in the message for ops triage.
1773                assert!(!msg.is_empty(), "fallback message must not be empty");
1774            }
1775            Err(FallbackReason::Cancelled) => {
1776                panic!("expected EmbeddingFailed, got Cancelled");
1777            }
1778            Err(FallbackReason::Timeout { .. }) => {
1779                panic!("expected EmbeddingFailed, got Timeout");
1780            }
1781            Err(FallbackReason::SlotExhausted) => {
1782                panic!("expected EmbeddingFailed, got SlotExhausted");
1783            }
1784            Err(FallbackReason::OAuthQuota { .. }) => {
1785                panic!("expected EmbeddingFailed, got OAuthQuota");
1786            }
1787            Err(FallbackReason::BackendMismatch { .. }) => {
1788                panic!("expected EmbeddingFailed, got BackendMismatch");
1789            }
1790            Err(FallbackReason::DimZero) => {
1791                panic!("expected EmbeddingFailed, got DimZero");
1792            }
1793            Ok(_) => {
1794                panic!("expected an error, got Ok — embedder must fail for bogus path");
1795            }
1796        }
1797    }
1798
1799    // G56: entity embed cache — unit tests
1800    #[test]
1801    fn g56_entity_cache_key_is_stable_and_distinct() {
1802        let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1803        let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1804        let k3 = entity_cache_key("codex:default", "claude-code");
1805        let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1806        assert_eq!(k1, k2, "same model+text must hash identically");
1807        assert_ne!(k1, k3, "different text must hash differently");
1808        assert_ne!(k1, k4, "different model must hash differently");
1809    }
1810
1811    #[test]
1812    fn g56_entity_embed_cache_stats_hit_rate() {
1813        let zero = EmbedCacheStats::default();
1814        assert_eq!(zero.hit_rate(), 0.0);
1815        let half = EmbedCacheStats {
1816            requested: 4,
1817            hits: 2,
1818            misses: 2,
1819        };
1820        assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1821        let all = EmbedCacheStats {
1822            requested: 7,
1823            hits: 7,
1824            misses: 0,
1825        };
1826        assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1827    }
1828
1829    #[test]
1830    fn g56_entity_embed_cache_populates_and_hits() {
1831        // Manually populate the cache: bypasses the LLM by writing a
1832        // known vector under a chosen (model, text) key, then verifies
1833        // the cache is consulted before any LLM call would happen.
1834        let cache = entity_embed_cache();
1835        let model = "test-model";
1836        let text = "sqlite-graphrag";
1837        let key = entity_cache_key(model, text);
1838        let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1839        cache.lock().insert(key, Arc::clone(&stored));
1840        let guard = cache.lock();
1841        let hit = guard.get(&key).expect("cache must return stored value");
1842        assert_eq!(hit.len(), crate::constants::embedding_dim());
1843        assert!((hit[0] - 0.42).abs() < 1e-6);
1844    }
1845
1846    #[test]
1847    fn g56_empty_texts_short_circuits_with_zero_stats() {
1848        // Cannot call embed_entity_texts_cached without an LLM on PATH,
1849        // so we only verify the empty-input contract via the stats struct.
1850        let stats = EmbedCacheStats::default();
1851        assert_eq!(stats.requested, 0);
1852        assert_eq!(stats.hits, 0);
1853        assert_eq!(stats.misses, 0);
1854        assert_eq!(stats.hit_rate(), 0.0);
1855    }
1856}
1857
1858// =============================================================================
1859// v1.0.82 (GAP-005) — embed_with_fallback tests
1860// =============================================================================
1861#[cfg(test)]
1862mod embed_with_fallback_tests {
1863    use super::*;
1864    use crate::llm::exit_code_hints::LlmBackendError;
1865
1866    #[test]
1867    fn none_backend_returns_empty_vector_without_calling_llm() {
1868        // The `None` backend short-circuits to `Ok(vec![])` without
1869        // touching the LLM at all. This is the signal the caller uses
1870        // to insert a `pending_embeddings` row.
1871        let (v, kind) = embed_via_backend(
1872            std::path::Path::new("/nonexistent"),
1873            "any text",
1874            &LlmBackendKind::None,
1875        )
1876        .expect("None backend never fails");
1877        assert!(v.is_empty());
1878        assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1879    }
1880
1881    #[test]
1882    fn empty_chain_defaults_to_codex_claude_none() {
1883        // Internal invariant: the default chain order is the v1.0.81
1884        // implicit order (codex first, then claude, then None as
1885        // graceful-degradation fallback).
1886        let defaults = [
1887            LlmBackendKind::Codex,
1888            LlmBackendKind::Claude,
1889            LlmBackendKind::None,
1890        ];
1891
1892        // ---------------------------------------------------------------
1893        // ADR-0042: as_str + reason_code unit tests
1894        // ---------------------------------------------------------------
1895
1896        #[allow(dead_code)]
1897        fn llm_backend_kind_as_str_is_stable() {
1898            assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1899            assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1900            assert_eq!(LlmBackendKind::None.as_str(), "none");
1901        }
1902
1903        #[allow(dead_code)]
1904        fn fallback_reason_reason_code_is_stable() {
1905            assert_eq!(
1906                FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1907                "embedding_failed"
1908            );
1909            assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1910            assert_eq!(
1911                FallbackReason::Timeout {
1912                    operation: "embed_query".into(),
1913                    duration_secs: 30
1914                }
1915                .reason_code(),
1916                "timeout"
1917            );
1918        }
1919        assert_eq!(defaults.len(), 3);
1920    }
1921
1922    #[test]
1923    fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
1924        // ADR-0046 / BUG-11 v1.0.88: a fallback chain of only `[None]`
1925        // without `skip_on_failure=true` MUST abort with
1926        // `AppError::Embedding("no LLM backends available; fallback chain exhausted")`.
1927        //
1928        // Before BUG-11, the `None` tail returned `Ok((vec![], None))`
1929        // silently, which let `remember` persist a memory with a
1930        // zero-dimensional embedding (invisible to recall). The fix
1931        // routes the chain exhaustion through `embed_via_backend_strict`
1932        // so the caller can distinguish between "chain intentionally
1933        // degrades to skip" (skip_on_failure=true) and "chain has no
1934        // viable backend at all" (this test).
1935        let chain = vec![LlmBackendKind::None];
1936        let err = embed_with_fallback(
1937            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1938            "hello",
1939            &chain,
1940            false,
1941        )
1942        .expect_err("chain of only [None] without skip_on_failure MUST abort");
1943        let msg = format!("{err}");
1944        assert!(
1945            msg.contains("no LLM backends available"),
1946            "error must mention exhausted chain, got: {msg}"
1947        );
1948    }
1949    #[test]
1950    fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
1951        // skip_on_failure=true + a chain of only `None` returns Ok(vec![])
1952        // because the None short-circuit always succeeds. This is the
1953        // canonical contract: skip_on_failure is a no-op when None is
1954        // the tail because None already provides graceful degradation.
1955        let chain = vec![LlmBackendKind::None];
1956        let v = embed_with_fallback(
1957            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1958            "hello",
1959            &chain,
1960            true,
1961        )
1962        .expect("None chain is always Ok");
1963        assert!(v.0.is_empty(), "vector must be empty");
1964        assert_eq!(v.1, LlmBackendKind::None);
1965    }
1966    #[allow(dead_code)]
1967    fn llm_backend_error_no_backends_default_message() {
1968        // The fallback chain exhaustion error must mention
1969        // in its hint so the operator knows the remediation.
1970        let e = LlmBackendError::NoBackendsAvailable;
1971        let h = e.hint();
1972        assert!(h.contains("--llm-fallback"));
1973    }
1974
1975    #[test]
1976    fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
1977        let e = LlmBackendError::NonZeroExit {
1978            exit_code: Some(137),
1979            signal: Some(9),
1980            stdout_tail: "out".into(),
1981            stderr_tail: "OOM killed".into(),
1982            binary: "codex".into(),
1983            hint: "OOM".into(),
1984        };
1985        let s = e.to_string();
1986        assert!(s.contains("codex"));
1987        assert!(s.contains("OOM killed"));
1988        assert!(s.contains("signal 9") || s.contains("exit 137"));
1989    }
1990}