Skip to main content

sqlite_graphrag/
embedder.rs

1//! Embedding generation for the GraphRAG memory.
2//!
3//! v1.0.76: the default build is **LLM-only** — the binary does NOT bundle
4//! fastembed / ort / ndarray / tokenizers. All embeddings are produced
5//! by a headless invocation of `claude code` or `codex` (OAuth, no MCP,
6//! no hooks) and stored as a BLOB in `memory_embeddings(memory_id, embedding,
7//! source)`. Vector similarity is computed in pure Rust at query time.
8//!
9//! # Workload classification (G42/S3, BLOCK 1 — MANDATORY)
10//!
11//! LLM embedding is **I/O-bound + subprocess-bound**: each call waits
12//! 5-60s on a network round-trip through a headless `claude -p` /
13//! `codex exec` subprocess while the local CPU stays idle. Concurrency
14//! therefore uses **tokio** (async I/O concurrency) and NEVER rayon
15//! (reserved for CPU-bound work).
16//!
17//! # Permit formula (G42/S3, BLOCO 2)
18//!
19//! ```text
20//! permits = clamp(--llm-parallelism, 1, 32)
21//!           .min(available_parallelism())
22//!           .min(available_ram_mb * 0.5 / LLM_WORKER_RSS_MB)
23//! ```
24//!
25//! `LLM_WORKER_RSS_MB = 350` (`crate::constants`): `claude -p` and
26//! `codex exec` are node processes with a typical Maximum RSS of
27//! 200-400 MB (measured via `/usr/bin/time -l` on macOS /
28//! `/usr/bin/time -v` on Linux), so the RAM bound is pertinent.
29//!
30//! # Locking contract (G42/A3 fix)
31//!
32//! The process-wide `Mutex<LlmEmbedding>` protects ONLY the cheap clone
33//! of the client configuration (flavour + binary path + model + shared
34//! schema tempfiles). It is NEVER held across network I/O — the
35//! v1.0.76-v1.0.78 `flush_group` held it for the whole sequential
36//! embedding loop, which is why `--llm-parallelism 8` measured an
37//! effective parallelism of 1.
38
39use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49/// Process-wide LLM-embedding client behind a .
50///
51/// The lock guards configuration cloning only (see module docs); the
52/// actual LLM I/O happens on clones, outside the lock.
53///
54/// ADR-0042 / GAP-002: process-wide Claude-backed LLM-embedding client
55/// behind a `Mutex`. Distinct from `EMBEDDER` so the Claude path of
56/// `embed_via_backend` no longer re-probes PATH via `detect_available`
57/// (the v1.0.82 bug where requesting Claude could resolve to Codex).
58static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60
61/// Process-wide multi-thread tokio runtime for embedding I/O.
62///
63/// G42/A2 fix: v1.0.76-v1.0.78 built a current-thread runtime PER CALL.
64/// One runtime per process amortises the setup and hosts the bounded
65/// fan-out of `embed_texts_parallel`.
66static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
67
68/// Calibration base: chunk (long-text) batch size per LLM call at the
69/// calibration dimensionality (G42/S2). Use [`chunk_embed_batch_size`]
70/// for the dim-adaptive value (G44).
71pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
72
73/// Calibration base: entity-name (short-text) batch size per LLM call at
74/// the calibration dimensionality (G42/S2). Use [`entity_embed_batch_size`]
75/// for the dim-adaptive value (G44).
76pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
77
78/// Dimensionality the batch bases above were calibrated against (G44).
79pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
80
81/// G44: scales a calibration-base batch size to the active dimensionality,
82/// keeping the float budget per LLM call constant (~512 floats for chunks,
83/// ~1600 for entity names — the budgets empirically validated at dim 64).
84/// Fixed batches of 8 at 384 dims asked for ~3072 floats per response:
85/// claude returned partial coverage (3 of 8 items, caught by the G42/C5
86/// check) and codex timed out at 300s. `base.max(1)` keeps the function
87/// total — `clamp` panics when the upper bound is below the lower one.
88fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
89    let base = base.max(1);
90    (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
91}
92
93/// Dim-adaptive batch size for chunk (long-text) embedding calls (G44).
94pub fn chunk_embed_batch_size() -> usize {
95    let dim = crate::constants::embedding_dim();
96    let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
97    tracing::debug!(
98        dim,
99        base = CHUNK_EMBED_BATCH_SIZE,
100        batch,
101        "adaptive chunk batch size (G44)"
102    );
103    batch
104}
105
106/// Dim-adaptive batch size for entity-name (short-text) embedding calls (G44).
107pub fn entity_embed_batch_size() -> usize {
108    let dim = crate::constants::embedding_dim();
109    let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
110    tracing::debug!(
111        dim,
112        base = ENTITY_EMBED_BATCH_SIZE,
113        batch,
114        "adaptive entity batch size (G44)"
115    );
116    batch
117}
118
119/// Returns the process-wide multi-thread runtime, building it on first use.
120pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
121    if let Some(rt) = RUNTIME.get() {
122        return Ok(rt);
123    }
124    let rt = tokio::runtime::Builder::new_multi_thread()
125        .worker_threads(2)
126        .enable_all()
127        .build()
128        .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
129    let _ = RUNTIME.set(rt);
130    Ok(RUNTIME.get().expect("RUNTIME initialised above"))
131}
132
133/// Initialises the LLM-embedding client on first use and returns it.
134pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
135    if let Some(e) = EMBEDDER.get() {
136        return Ok(e);
137    }
138    let backend = LlmEmbedding::detect_available()?;
139    let _ = EMBEDDER.set(Mutex::new(backend));
140    Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
141}
142
143/// ADR-0042 / GAP-002: returns the process-wide Claude embedder, lazily
144/// initialising it on first use. Binary and model overrides come from
145/// the explicit arguments; `None` falls back to PATH/env defaults via
146/// the builder.
147pub fn get_claude_embedder(
148    claude_binary: Option<&Path>,
149    claude_model: Option<&str>,
150) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
151    if let Some(e) = CLAUDE_EMBEDDER.get() {
152        return Ok(e);
153    }
154    let mut builder = LlmEmbedding::with_claude_builder();
155    if let Some(b) = claude_binary {
156        builder = builder.override_binary(b.to_path_buf());
157    }
158    if let Some(m) = claude_model {
159        builder = builder.override_model(m.to_string());
160    }
161    let backend = builder.build()?;
162    let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
163    Ok(CLAUDE_EMBEDDER
164        .get()
165        .expect("CLAUDE_EMBEDDER initialised above"))
166}
167
168/// ADR-0042 / GAP-002: route a single passage through the Claude
169/// embedder. Used by the Claude arm of `embed_via_backend` so the
170/// fallback chain stops treating Claude as a synonym for codex.
171pub fn embed_via_claude_local(
172    _models_dir: &Path,
173    text: &str,
174    claude_binary: Option<&Path>,
175    claude_model: Option<&str>,
176) -> Result<Vec<f32>, AppError> {
177    let _slot_guard = acquire_llm_slot_for_embedding()?;
178    let embedder = get_claude_embedder(claude_binary, claude_model)?;
179    embed_passage(embedder, text)
180}
181
182/// BUG-003 / v1.0.85: split of  that also
183/// reports the resolved []. Always  because
184/// this path constructs a Claude-flavoured embedder via
185///  (no PATH probe, no silent substitution).
186pub fn embed_via_claude_local_resolved(
187    _models_dir: &Path,
188    text: &str,
189    claude_binary: Option<&Path>,
190    claude_model: Option<&str>,
191) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
192    let _slot_guard = acquire_llm_slot_for_embedding()?;
193    let embedder = get_claude_embedder(claude_binary, claude_model)?;
194    let v = embed_passage(embedder, text)?;
195    Ok((v, LlmBackendKind::Claude))
196}
197/// Clones the embedding-client configuration. The lock is held only for
198/// the duration of the clone — NEVER across I/O (G42/A3).
199fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
200    embedder.lock().clone()
201}
202
203/// Embeds a single passage for storage. Delegates to the configured LLM
204/// headless (claude code / codex). Returns a vector of the active
205/// dimensionality.
206pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
207    let client = clone_client(embedder);
208    let result = client.embed_passage(text)?;
209    validate_dim(result)
210}
211
212/// Embeds a single query for similarity search. Same model and dim as
213/// `embed_passage`; the only difference is the LLM-side prompt prefix
214/// that the headless invocation uses to disambiguate.
215pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
216    let client = clone_client(embedder);
217    let result = client.embed_query(text)?;
218    validate_dim(result)
219}
220
221/// Embeds a batch of passages with token-count-aware batching.
222///
223/// Kept for API compatibility; since v1.0.79 it routes through the
224/// bounded parallel fan-out with conservative defaults.
225pub fn embed_passages_controlled(
226    embedder: &Mutex<LlmEmbedding>,
227    texts: &[&str],
228    _token_counts: &[usize],
229) -> Result<Vec<Vec<f32>>, AppError> {
230    if texts.is_empty() {
231        return Ok(Vec::new());
232    }
233    let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
234    embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
235}
236
237pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
238    let _slot_guard = acquire_llm_slot_for_embedding()?;
239    let embedder = get_embedder(models_dir)?;
240    embed_passage(embedder, text)
241}
242
243/// BUG-003 / v1.0.85: split of `embed_passage_local` that reports the
244/// resolved [`LlmBackendKind`] based on the ACTUAL
245/// [`LlmEmbedding::flavour`] of the embedder constructed. When
246/// `LlmEmbedding::detect_available` substitutes claude for a missing
247/// codex, the operator sees the truth in `envelope.backend_invoked`.
248pub fn embed_passage_local_resolved(
249    models_dir: &Path,
250    text: &str,
251) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
252    let _slot_guard = acquire_llm_slot_for_embedding()?;
253    let embedder = get_embedder(models_dir)?;
254    let v = embed_passage(embedder, text)?;
255    let kind = match embedder.lock().flavour() {
256        crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
257        crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
258    };
259    Ok((v, kind))
260}
261
262pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
263    let _slot_guard = acquire_llm_slot_for_embedding()?;
264    let embedder = get_embedder(models_dir)?;
265    embed_query(embedder, text)
266}
267
268// =============================================================================
269// v1.0.82 (GAP-003): wrappers que aceitam a escolha do CLI
270// (`crate::cli::LlmBackendChoice`) e a traduzem em uma chain para
271// `embed_with_fallback`. Centralizam a propagação do flag `--llm-backend`
272// nos 6 comandos que produzem embedding (`remember`, `edit`, `ingest`,
273// `enrich`, `recall`, `hybrid-search`).
274// =============================================================================
275
276/// Embed a single passage using the LLM backend selected by the user via
277/// `--llm-backend`. Routes to `embed_with_fallback` so failures fall
278/// through to the next backend in the chain before giving up.
279///
280/// When `choice` is `None` (e.g. a sub-command that does not yet
281/// expose the flag), behaviour matches `embed_passage_local` — the
282/// active embedder from `LlmEmbedding::detect_available` decides the
283/// backend.
284pub fn embed_passage_with_choice(
285    models_dir: &Path,
286    text: &str,
287    choice: Option<crate::cli::LlmBackendChoice>,
288) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
289    let _slot_guard = acquire_llm_slot_for_embedding()?;
290    match choice {
291        None => {
292            let embedder = get_embedder(models_dir)?;
293            embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
294        }
295        Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
296    }
297}
298/// failure, returns a structured `FallbackReason` so the caller can
299/// surface `vec_degraded` instead of a hard exit 11.
300///
301/// `None` matches the legacy `try_embed_query_with_fallback` path
302/// (uses the active embedder without an explicit chain).
303pub fn try_embed_query_with_choice(
304    models_dir: &Path,
305    text: &str,
306    choice: Option<crate::cli::LlmBackendChoice>,
307) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
308    match embed_passage_with_choice(models_dir, text, choice) {
309        // GAP-004 / v1.0.85.1: when the chain terminates on
310        //  (i.e. user passed
311        // or every preceding backend failed),  returns
312        //  instead of an error. Without this guard the
313        // empty vector would propagate to  which
314        // aborts with exit 11 ("embedding has 0 dims, expected 64").
315        // The caller's contract is to surface a typed
316        // so  and  can route to FTS5-puro via
317        // the existing  /  envelope.
318        // Intercept the empty-vector success path and surface it as
319        //  (introduced at v1.0.85 / ADR-0043
320        // for the symmetric LLM-returned-zero-dim case).
321        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
322        Ok((v, backend)) => Ok((v, backend)),
323        Err(e) => Err(classify_embedding_error(e)),
324    }
325}
326/// call. Reads the max-concurrency from
327/// `SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY` (default derived from
328/// `LLM_WORKER_RSS_MB` and available memory), and the wait timeout
329/// from `SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS` (default 30s).
330///
331/// Returns `Ok(guard)` for happy path, `AppError::LockBusy` (exit 75)
332/// when no slot is available within the wait window, and
333/// `AppError::Validation` when the concurrency is 0.
334///
335/// The `LLM_SLOT_NO_WAIT` env var (or its CLI flag equivalent) sets
336/// `wait_secs = 0` to fail fast in tests.
337fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
338    use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
339    let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
340        .ok()
341        .and_then(|s| s.parse::<u32>().ok())
342        .filter(|n| *n >= 1)
343        .unwrap_or_else(crate::llm_slots::default_max_concurrency);
344    let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
345        0
346    } else {
347        std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
348            .ok()
349            .and_then(|s| s.parse::<u64>().ok())
350            .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
351    };
352    let _ = LLM_WORKER_RSS_MB; // silence the unused import (used in default_max_concurrency)
353                               // GAP-003 / ADR-0043: when the slot semaphore is contended beyond the
354                               // backoff window (50 + 100 + 200 + 400 = 750ms total), return a
355                               // marker message that `classify_embedding_error` maps to
356                               // `FallbackReason::SlotExhausted` (discriminator `slot_exhausted`).
357                               // The window is shorter than the legacy 30s timeout, so the operator
358                               // observes FTS5-puro fallback quickly instead of after 30s of silence.
359    match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
360        Ok(guard) => Ok(guard),
361        Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
362            "slot exhausted: {e} (fall back to FTS5)"
363        ))),
364        Err(e) => Err(e),
365    }
366}
367/// G58/S1: reason an embedding call could not be completed and the caller
368/// must fall back to a non-vector retrieval path (FTS5 prefix + LIKE).
369///
370/// Returned by [`try_embed_query_with_fallback`] so the `recall` and
371/// `hybrid-search` handlers can surface a structured `vec_degraded` /
372/// `warning` envelope instead of a hard `AppError::Embedding` exit 11.
373#[derive(Debug, Clone, PartialEq)]
374pub enum FallbackReason {
375    /// The LLM subprocess failed (rate limit, OAuth contention, quota
376    /// exhausted, model unparsable response, divergent dim, etc.).
377    /// Carries the original error message for observability.
378    EmbeddingFailed(String),
379    /// The LLM slot semaphore was exhausted: 8+ concurrent LLM
380    /// subprocesses blocked the acquire beyond the backoff window
381    /// (50ms + 100ms + 200ms + 400ms = 750ms total). Resolved at v1.0.85
382    /// (GAP-003 / ADR-0043).
383    SlotExhausted,
384    /// OAuth usage quota exhausted on the named backend. The caller
385    /// should retry with an alternative backend (codex ↔ claude)
386    /// before falling back to FTS5-puro.
387    OAuthQuota { backend: &'static str },
388    /// The user requested a backend that differs from the one that
389    /// actually executed the embedding (legacy "synonym for codex"
390    /// bug from v1.0.83). Resolved at v1.0.84 (GAP-002).
391    BackendMismatch {
392        requested: &'static str,
393        resolved: &'static str,
394    },
395    /// The embedding returned a zero-dimensional vector, signalling a
396    /// structural bug (the LLM did not produce any floats). Distinct
397    /// from OAuthQuota (quota exhausted) and EmbeddingFailed
398    /// (subprocess error).
399    DimZero,
400    /// The embedding was cancelled by an external signal (SIGTERM, etc.).
401    Cancelled,
402    /// The embedding exceeded its time budget. Carries the operation name
403    /// and the elapsed seconds for diagnostic logging.
404    Timeout {
405        operation: String,
406        duration_secs: u64,
407    },
408}
409
410impl FallbackReason {
411    /// Stable, machine-friendly reason code used by JSON envelopes
412    /// (`vec_degraded_reason`). Mirrors the v1.0.84 contract extended
413    /// at v1.0.85 with 4 new variants (GAP-003 / ADR-0043).
414    pub fn reason_code(&self) -> &'static str {
415        match self {
416            Self::EmbeddingFailed(_) => "embedding_failed",
417            Self::SlotExhausted => "slot_exhausted",
418            Self::OAuthQuota { .. } => "oauth_quota",
419            Self::BackendMismatch { .. } => "backend_mismatch",
420            Self::DimZero => "dim_zero",
421            Self::Cancelled => "cancelled",
422            Self::Timeout { .. } => "timeout",
423        }
424    }
425}
426
427impl std::fmt::Display for FallbackReason {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        match self {
430            Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
431            Self::SlotExhausted => write!(
432                f,
433                "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
434            ),
435            Self::OAuthQuota { backend } => {
436                write!(f, "OAuth usage quota exhausted on backend '{backend}'")
437            }
438            Self::BackendMismatch {
439                requested,
440                resolved,
441            } => {
442                write!(
443                    f,
444                    "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
445                )
446            }
447            Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
448            Self::Cancelled => write!(f, "embedding cancelled by external signal"),
449            Self::Timeout {
450                operation,
451                duration_secs,
452            } => {
453                write!(
454                    f,
455                    "embedding timed out after {duration_secs}s during {operation}"
456                )
457            }
458        }
459    }
460}
461
462impl std::error::Error for FallbackReason {}
463
464/// G58/S1: try to embed a query, mapping any failure to a structured
465/// [`FallbackReason`] so callers can route to FTS5 + LIKE fallback instead
466/// of returning exit 11 to the user.
467///
468/// This is the bridge between the hard-fail `embed_query_local` (used by
469/// write paths where embedding failure aborts the operation) and the
470/// graceful-degradation contract of `recall` / `hybrid-search` in v1.0.80.
471pub fn try_embed_query_with_fallback(
472    models_dir: &Path,
473    query: &str,
474) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
475    match embed_query_local(models_dir, query) {
476        Ok(v) => Ok((v, LlmBackendKind::None)),
477        Err(e) => Err(classify_embedding_error(e)),
478    }
479}
480
481/// G58 / ADR-0043 (v1.0.85): deterministic fallback for `recall` and
482/// `hybrid-search`.
483///
484/// - On `OAuthQuota { backend }`, retry once with the alternative backend
485///   (codex ↔ claude) before giving up.
486/// - On `SlotExhausted`, sleep 750ms and retry once (gives the slot
487///   semaphore time to release a permit from a sibling subprocess).
488/// - On any other `FallbackReason`, return immediately (deterministic).
489pub fn try_embed_query_with_deterministic_fallback(
490    models_dir: &Path,
491    query: &str,
492    choice: Option<crate::cli::LlmBackendChoice>,
493) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
494    match try_embed_query_with_choice(models_dir, query, choice) {
495        Ok(t) => Ok(t),
496        Err(reason @ FallbackReason::OAuthQuota { backend }) => {
497            let alt = match backend {
498                "codex" => Some(crate::cli::LlmBackendChoice::Claude),
499                "claude" => Some(crate::cli::LlmBackendChoice::Codex),
500                _ => None,
501            };
502            if let Some(alt_choice) = alt {
503                try_embed_query_with_choice(models_dir, query, Some(alt_choice))
504            } else {
505                Err(reason)
506            }
507        }
508        Err(reason @ FallbackReason::SlotExhausted) => {
509            std::thread::sleep(std::time::Duration::from_millis(750));
510            try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
511        }
512        Err(other) => Err(other),
513    }
514}
515
516/// Classify an embedding [`AppError`] into a typed [`FallbackReason`].
517///
518/// v1.0.85 (ADR-0043): discriminates the 4 new causes (SlotExhausted,
519/// OAuthQuota, BackendMismatch, DimZero) from the legacy generic
520/// EmbeddingFailed bucket. The classification is purely lexical
521/// (substring match on the message) — no I/O, no retries, no
522/// telemetry, deterministic and `#[serial_test::serial(env)]`-safe.
523pub fn classify_embedding_error(err: AppError) -> FallbackReason {
524    match err {
525        AppError::Embedding(msg) if msg.contains("cancelled") => FallbackReason::Cancelled,
526        AppError::Embedding(msg) if msg.contains("slot exhausted") => FallbackReason::SlotExhausted,
527        AppError::Embedding(msg) if msg.contains("OAuth") || msg.contains("quota") => {
528            let backend = if msg.contains("codex") {
529                "codex"
530            } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
531                // G45-CR5: anthropic-ratelimit-* headers are emitted only by
532                // the Claude CLI subprocess; treat them as claude quota
533                // signals even when the message text omits the word
534                // "claude" explicitly.
535                "claude"
536            } else {
537                "unknown"
538            };
539            FallbackReason::OAuthQuota { backend }
540        }
541        AppError::Embedding(msg) if msg.contains("backend mismatch") => {
542            // The `msg.contains("claude")` arm is intentionally
543            // placed BEFORE the `msg.contains("OAuth")` arm so that
544            // a backend-mismatch message that mentions both
545            // "claude" and "codex" maps to BackendMismatch (the more
546            // specific failure mode).
547            let (requested, resolved) =
548                if msg.contains("requested claude") && msg.contains("but codex") {
549                    ("claude", "codex")
550                } else if msg.contains("requested codex") && msg.contains("but claude") {
551                    ("codex", "claude")
552                } else if msg.contains("requested claude") {
553                    ("claude", "unknown")
554                } else if msg.contains("requested codex") {
555                    ("codex", "unknown")
556                } else {
557                    ("unknown", "unknown")
558                };
559            FallbackReason::BackendMismatch {
560                requested,
561                resolved,
562            }
563        }
564        AppError::Embedding(msg) if msg.contains("dim") && msg.contains("zero") => {
565            FallbackReason::DimZero
566        }
567        AppError::Timeout {
568            operation,
569            duration_secs,
570        } => FallbackReason::Timeout {
571            operation,
572            duration_secs,
573        },
574        AppError::Embedding(msg) => FallbackReason::EmbeddingFailed(msg),
575        e => FallbackReason::EmbeddingFailed(e.to_string()),
576    }
577}
578// backends before giving up. The chain order matches the user-supplied
579// `--llm-fallback` list (default: codex, claude, none).
580// =============================================================================
581
582/// Tries each LLM backend in `chain` in order, returning the first
583/// successful embedding. On failure, the diagnostic tail of the last
584/// error is preserved in the returned `AppError::Embedding` so the
585/// operator can see WHY every backend failed.
586///
587/// If `skip_on_failure` is `true` AND every backend fails, the function
588/// returns `Ok(Vec::new())` (an empty vector) to signal "persist
589/// without embedding" — the call site is then responsible for writing
590/// a `pending_embeddings` row that can be retried later by the
591/// `embedding retry` subcommand.
592///
593/// Defaults the chain to `[codex, claude, none]` when `chain` is
594/// empty, matching the v1.0.81 behaviour where codex was the
595/// implicit default and claude was the implicit fallback.
596pub fn embed_with_fallback(
597    models_dir: &Path,
598    text: &str,
599    chain: &[LlmBackendKind],
600    skip_on_failure: bool,
601) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
602    use crate::llm::exit_code_hints::LlmBackendError;
603    let effective: Vec<LlmBackendKind> = if chain.is_empty() {
604        vec![
605            LlmBackendKind::Codex,
606            LlmBackendKind::Claude,
607            LlmBackendKind::None,
608        ]
609    } else {
610        chain.to_vec()
611    };
612
613    let mut last_err: Option<AppError> = None;
614    for backend in &effective {
615        // BUG-003 / v1.0.85: propagar o backend REAL retornado por
616        // embed_via_backend (que pode diferir do chain position quando
617        // LlmEmbedding::detect_available substitui codex por claude).
618        // O tuple `(_, requested_kind)` é descartado — só queremos o
619        // backend resolvido na primeira posição.
620        match embed_via_backend(models_dir, text, backend) {
621            Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
622            Err(e) => {
623                tracing::warn!(
624                    target: "embedding",
625                    backend = ?backend,
626                    error = %e,
627                    "embed_with_fallback: backend failed, trying next"
628                );
629                last_err = Some(e);
630            }
631        }
632    }
633    if skip_on_failure {
634        // Signal "persist with no embedding" via an empty vector paired
635        // with `None` so callers know the chain exhausted without a hit.
636        return Ok((Vec::new(), LlmBackendKind::None));
637    }
638    Err(last_err
639        .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
640}
641
642/// LLM backend kind for the fallback chain. Mirrors the CLI
643/// `--llm-backend` enum so users can pass the same value to
644/// `--llm-fallback` without translation.
645#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
646pub enum LlmBackendKind {
647    /// `codex exec` (default for v1.0.76+).
648    Codex,
649    /// `claude -p` (fallback for ChatGPT Pro OAuth unavailability).
650    Claude,
651    /// No embedding — empty vector returned.
652    None,
653}
654
655impl LlmBackendKind {
656    /// Stable string label used in tracing and JSON envelopes. The
657    /// string values are part of the public contract for `envelope.backend_invoked`.
658    pub fn as_str(self) -> &'static str {
659        match self {
660            Self::Codex => "codex",
661            Self::Claude => "claude",
662            Self::None => "none",
663        }
664    }
665}
666
667/// Embeds a single text via the given backend. Used by
668/// `embed_with_fallback` and exposed to allow direct one-shot
669/// selection without a chain.
670/// Embeds a single text via the given backend. Used by
671/// `embed_with_fallback` and exposed to allow direct one-shot
672/// selection without a chain.
673///
674/// BUG-003 / v1.0.85: returns `(Vec<f32>, LlmBackendKind)`. The
675/// second element reports the backend that ACTUALLY executed the
676/// embedding, not the chain position requested by the caller. When
677/// `LlmBackendKind::Codex` is requested but `codex` is absent from
678/// PATH, `LlmEmbedding::detect_available` substitutes claude and the
679/// tuple carries `LlmBackendKind::Claude` so the operator sees the
680/// truth in `envelope.backend_invoked`.
681pub fn embed_via_backend(
682    models_dir: &Path,
683    text: &str,
684    backend: &LlmBackendKind,
685) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
686    match backend {
687        LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
688        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
689        LlmBackendKind::Claude => {
690            // ADR-0042 / GAP-002: route Claude through its own static
691            // embedder instead of re-using the Codex path (which used
692            // to silently pick Codex if PATH ordered it first).
693            tracing::debug!(
694                target: "embedder",
695                backend = "claude",
696                "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
697            );
698            embed_via_claude_local_resolved(models_dir, text, None, None)
699        }
700    }
701}
702
703/// Legacy one-shot wrapper around `embed_via_backend` that discards
704/// the resolved backend. Kept for call sites that only care about
705/// the vector and ignore the executed-backend signal. New code
706/// should prefer `embed_via_backend` directly.
707pub fn embed_via_backend_legacy(
708    models_dir: &Path,
709    text: &str,
710    backend: &LlmBackendKind,
711) -> Result<Vec<f32>, AppError> {
712    embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
713}
714
715pub fn embed_passages_controlled_local(
716    models_dir: &Path,
717    texts: &[&str],
718    token_counts: &[usize],
719) -> Result<Vec<Vec<f32>>, AppError> {
720    let embedder = get_embedder(models_dir)?;
721    embed_passages_controlled(embedder, texts, token_counts)
722}
723
724/// G42/S3: embeds `texts` through the bounded parallel fan-out and
725/// returns vectors in input order.
726pub fn embed_passages_parallel_local(
727    models_dir: &Path,
728    texts: &[String],
729    parallelism: usize,
730    batch_size: usize,
731) -> Result<Vec<Vec<f32>>, AppError> {
732    let embedder = get_embedder(models_dir)?;
733    embed_texts_parallel(embedder, texts, parallelism, batch_size)
734}
735
736/// G56: in-process cache for entity embeddings keyed by `(model, text)`.
737///
738/// Schema v13 is immutable: `entity_embeddings` does not have a `text`
739/// column, so a pure DB-side cache would require a schema bump. Instead
740/// we keep a process-wide LRU-style map that survives within one CLI
741/// invocation. The hit rate is high in `ingest` (re-embedding the same
742/// canonical entity across thousands of memories) and modest in `remember`
743/// (typical single-memory invocations).
744///
745/// Key: `blake3(model || "\0" || text)`. Value: `Arc<Vec<f32>>` so the
746/// collector can drop the map entry while a `Vec` is still in flight.
747type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
748
749static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
750
751fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
752    ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
753}
754
755fn entity_cache_key(model: &str, text: &str) -> u64 {
756    let mut hasher = blake3::Hasher::new();
757    hasher.update(model.as_bytes());
758    hasher.update(b"\0");
759    hasher.update(text.as_bytes());
760    let h = hasher.finalize();
761    let bytes = h.as_bytes();
762    u64::from_le_bytes([
763        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
764    ])
765}
766
767/// G56: embeds entity-name texts through a process-wide cache.
768///
769/// Skips any `(model, text)` pair already produced in this CLI invocation
770/// and only spawns subprocesses for the cache misses. Returns vectors in
771/// the same order as `texts`.
772///
773/// Designed for entity-name batches (short texts). For chunk embeds use
774/// [`embed_passages_parallel_local`] directly — chunks are unique per
775/// memory and cache hit rate is negligible.
776pub fn embed_entity_texts_cached(
777    models_dir: &Path,
778    texts: &[String],
779    parallelism: usize,
780) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
781    if texts.is_empty() {
782        return Ok((Vec::new(), EmbedCacheStats::default()));
783    }
784    let embedder = get_embedder(models_dir)?;
785    let model = embedder.lock().model_label();
786    let cache = entity_embed_cache();
787    let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
788    let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
789    {
790        let guard = cache.lock();
791        for (i, text) in texts.iter().enumerate() {
792            let key = entity_cache_key(&model, text);
793            if let Some(v) = guard.get(&key) {
794                hits[i] = Some(Arc::clone(v));
795            } else {
796                miss_indices.push(i);
797            }
798        }
799    }
800    let miss_count = miss_indices.len();
801    if miss_count > 0 {
802        let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
803        let miss_vecs = embed_texts_parallel(
804            embedder,
805            &miss_texts,
806            parallelism,
807            entity_embed_batch_size(),
808        )?;
809        let mut guard = cache.lock();
810        for (slot, &orig_idx) in miss_indices.iter().enumerate() {
811            let vec = Arc::new(miss_vecs[slot].clone());
812            let key = entity_cache_key(&model, &texts[orig_idx]);
813            guard.insert(key, Arc::clone(&vec));
814            hits[orig_idx] = Some(vec);
815        }
816    }
817    let mut out = Vec::with_capacity(texts.len());
818    for hit in hits.into_iter() {
819        let v = hit.ok_or_else(|| {
820            AppError::Embedding("entity embed cache produced null result".to_string())
821        })?;
822        out.push((*v).clone());
823    }
824    Ok((
825        out,
826        EmbedCacheStats {
827            requested: texts.len(),
828            hits: texts.len() - miss_count,
829            misses: miss_count,
830        },
831    ))
832}
833
834/// G56: stats snapshot returned by [`embed_entity_texts_cached`].
835#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
836pub struct EmbedCacheStats {
837    pub requested: usize,
838    pub hits: usize,
839    pub misses: usize,
840}
841
842impl EmbedCacheStats {
843    /// Hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 when nothing was requested.
844    pub fn hit_rate(&self) -> f64 {
845        if self.requested == 0 {
846            0.0
847        } else {
848            self.hits as f64 / self.requested as f64
849        }
850    }
851}
852
853/// G42/S3 core: bounded parallel batch embedding.
854///
855/// - texts are grouped into batches of `batch_size` (one LLM call per
856///   batch, G42/S2);
857/// - at most `effective_permits(parallelism)` LLM subprocesses run
858///   simultaneously (`Arc<Semaphore>` + `acquire_owned`, BLOCO 2);
859/// - results stream through a BOUNDED mpsc channel so the caller-side
860///   collector applies backpressure and can persist incrementally
861///   (BLOCO 5);
862/// - the global `CancellationToken` aborts in-flight work on the first
863///   signal; subprocesses die with their futures via `kill_on_drop`
864///   (BLOCO 6).
865pub fn embed_texts_parallel(
866    embedder: &Mutex<LlmEmbedding>,
867    texts: &[String],
868    parallelism: usize,
869    batch_size: usize,
870) -> Result<Vec<Vec<f32>>, AppError> {
871    let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
872    embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
873        slots[idx] = Some(v.to_vec());
874        Ok(())
875    })?;
876    let mut out = Vec::with_capacity(slots.len());
877    for (idx, slot) in slots.into_iter().enumerate() {
878        out.push(slot.ok_or_else(|| {
879            AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
880        })?);
881    }
882    Ok(out)
883}
884
885/// Like [`embed_texts_parallel`] but invokes `on_result` as soon as each
886/// embedding arrives (BLOCO 5: incremental persistence — a kill loses at
887/// most the in-flight batches, never the already-delivered items).
888pub fn embed_texts_parallel_with(
889    embedder: &Mutex<LlmEmbedding>,
890    texts: &[String],
891    parallelism: usize,
892    batch_size: usize,
893    mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
894) -> Result<(), AppError> {
895    if texts.is_empty() {
896        return Ok(());
897    }
898    let dim = crate::constants::embedding_dim();
899    if texts.len() == 1 {
900        let v = embed_passage(embedder, &texts[0])?;
901        return on_result(0, &v);
902    }
903
904    let client = clone_client(embedder);
905    let permits = effective_permits(parallelism);
906    let batches = build_batches(texts, batch_size.max(1));
907    let token = crate::cancel_token().clone();
908
909    let work = move |batch: Vec<(usize, String)>| {
910        let client = client.clone();
911        async move {
912            client
913                .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
914                .await
915        }
916    };
917
918    let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
919    match tokio::runtime::Handle::try_current() {
920        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
921        Err(_) => shared_runtime()?.block_on(fan_out),
922    }
923}
924
925/// Groups `(global_index, text)` pairs into batches of `batch_size`.
926fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
927    texts
928        .iter()
929        .cloned()
930        .enumerate()
931        .collect::<Vec<_>>()
932        .chunks(batch_size)
933        .map(|c| c.to_vec())
934        .collect()
935}
936
937/// G42/S3 BLOCO 2: effective permit count.
938///
939/// `permits = clamp(requested, 1, 32) ∧ cpus ∧ ram_livre*0.5/RSS` — see
940/// the module docs for the measured RSS rationale.
941pub fn effective_permits(requested: usize) -> usize {
942    let cpus = std::thread::available_parallelism()
943        .map(|n| n.get())
944        .unwrap_or(4);
945    let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
946        / crate::constants::LLM_WORKER_RSS_MB)
947        .max(1) as usize;
948    requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
949}
950
951/// Bounded fan-out engine. Generic over the per-batch work so the
952/// concurrency contract is testable without spawning real LLMs.
953///
954/// Cancel safety (BLOCO 6/10): every task races its work against
955/// `token.cancelled()` inside `tokio::select!`; both branches are
956/// cancel-safe (the work future owns its subprocess via `kill_on_drop`,
957/// and `cancelled()` is pure). On collector-side errors the `JoinSet`
958/// is shut down, which drops in-flight futures and kills their
959/// subprocesses.
960async fn run_bounded<F, Fut>(
961    batches: Vec<Vec<(usize, String)>>,
962    permits: usize,
963    dim: usize,
964    token: CancellationToken,
965    work: F,
966    on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
967) -> Result<(), AppError>
968where
969    F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
970    Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
971{
972    let total_batches = batches.len();
973    let semaphore = Arc::new(Semaphore::new(permits));
974    // BLOCO 5: bounded channel — producers block when the collector is
975    // behind (backpressure); PROIBIDO unbounded_channel between stages.
976    let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
977    let mut set: JoinSet<()> = JoinSet::new();
978
979    for (batch_idx, batch) in batches.into_iter().enumerate() {
980        let sem = Arc::clone(&semaphore);
981        let token = token.clone();
982        let tx = tx.clone();
983        let work = work.clone();
984        set.spawn(async move {
985            let wait_start = std::time::Instant::now();
986            // acquire_owned: RAII permit moved into the task; returned
987            // on every exit path INCLUDING panic (BLOCO 2).
988            let Ok(_permit) = sem.acquire_owned().await else {
989                let _ = tx
990                    .send(Err(AppError::Embedding("semaphore closed".to_string())))
991                    .await;
992                return;
993            };
994            let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
995            let work_start = std::time::Instant::now();
996            // ADR-0034: when `SQLITE_GRAPHRAG_IGNORE_SHUTDOWN=1` is set the
997            // cancellation arm is dropped and the batch runs to completion.
998            // This unblocks audit/test invocations whose `SHUTDOWN` flag was
999            // contaminated by an earlier signal handler in the same process
1000            // tree. Production code never sees this branch.
1001            let outcome = if crate::should_obey_shutdown() {
1002                tokio::select! {
1003                    res = work(batch) => res,
1004                    _ = token.cancelled() => Err(AppError::Embedding(
1005                        "embedding cancelled by shutdown signal".to_string(),
1006                    )),
1007                }
1008            } else {
1009                work(batch).await
1010            };
1011            // BLOCO 8: permit wait time logged SEPARATELY from work time.
1012            tracing::debug!(
1013                target: "embedding",
1014                batch_idx,
1015                permit_wait_ms,
1016                work_ms = work_start.elapsed().as_millis() as u64,
1017                ok = outcome.is_ok(),
1018                "embedding batch finished"
1019            );
1020            let _ = tx.send(outcome).await;
1021        });
1022    }
1023    drop(tx);
1024
1025    let mut completed = 0usize;
1026    let mut failed = 0usize;
1027    let mut cancelled = 0usize;
1028    let mut first_error: Option<AppError> = None;
1029
1030    while let Some(message) = rx.recv().await {
1031        match message {
1032            Ok(items) => {
1033                completed += 1;
1034                if first_error.is_none() {
1035                    for (idx, v) in items {
1036                        if v.len() != dim {
1037                            first_error = Some(AppError::Embedding(format!(
1038                                "LLM returned {} dims for item {idx}, expected {dim}; \
1039                                 refusing to truncate or pad silently (G42/C5)",
1040                                v.len()
1041                            )));
1042                            break;
1043                        }
1044                        if let Err(e) = on_result(idx, &v) {
1045                            first_error = Some(e);
1046                            break;
1047                        }
1048                    }
1049                    if first_error.is_some() {
1050                        // Abort remaining work: dropped futures kill
1051                        // their subprocesses via kill_on_drop (BLOCO 6).
1052                        set.shutdown().await;
1053                    }
1054                }
1055            }
1056            Err(e) => {
1057                if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1058                    cancelled += 1;
1059                } else {
1060                    failed += 1;
1061                }
1062                if first_error.is_none() {
1063                    first_error = Some(e);
1064                    set.shutdown().await;
1065                }
1066            }
1067        }
1068    }
1069
1070    // Drain the JoinSet: surface panics distinctly (panic handling —
1071    // JoinError::is_panic tratado em todo join_next, BLOCO 9).
1072    while let Some(join_result) = set.join_next().await {
1073        if let Err(join_err) = join_result {
1074            if join_err.is_panic() {
1075                failed += 1;
1076                if first_error.is_none() {
1077                    first_error = Some(AppError::Embedding(format!(
1078                        "embedding task panicked: {join_err}"
1079                    )));
1080                }
1081            } else {
1082                cancelled += 1;
1083            }
1084        }
1085    }
1086
1087    // v1.0.85 (ADR-0043 hygiene): the fan-out summary event moved
1088    // from `tracing::info!` to `tracing::debug!` and the
1089    // `available_permits` field was removed — the user prohibited
1090    // pool-state telemetry (slot_pool_stats / slot_wait_ms) and
1091    // decorative `tracing::info!` events. The remaining counters
1092    // (total_batches / completed / failed / cancelled) describe the
1093    // progress of the operation itself, not the slot pool, and
1094    // remain visible to operators running with `RUST_LOG=debug` or
1095    // `-vvv`.
1096    tracing::debug!(
1097        target: "embedding",
1098        total_batches,
1099        completed,
1100        failed,
1101        cancelled,
1102        "embedding fan-out finished"
1103    );
1104
1105    match first_error {
1106        Some(e) => Err(e),
1107        None => Ok(()),
1108    }
1109}
1110
1111pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1112    let mut out = Vec::with_capacity(v.len() * 4);
1113    for f in v {
1114        out.extend_from_slice(&f.to_le_bytes());
1115    }
1116    out
1117}
1118
1119pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1120    let mut out = Vec::with_capacity(bytes.len() / 4);
1121    for chunk in bytes.chunks_exact(4) {
1122        out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1123    }
1124    out
1125}
1126
1127/// Returns the dimensionality of the embedding space. Used to
1128/// validate LLM responses and to size the in-memory cache.
1129pub fn embedding_dim() -> usize {
1130    crate::constants::embedding_dim()
1131}
1132
1133/// G42/C5: a vector with a divergent dimensionality is an ERROR, never
1134/// silently truncated or zero-padded (the pre-v1.0.79 `normalise_dim`
1135/// masked malformed LLM responses).
1136fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1137    let dim = crate::constants::embedding_dim();
1138    if v.len() != dim {
1139        return Err(AppError::Embedding(format!(
1140            "embedding has {} dims, expected {dim}; \
1141             refusing to truncate or pad silently (G42/C5)",
1142            v.len()
1143        )));
1144    }
1145    Ok(v)
1146}
1147
1148#[cfg(test)]
1149mod tests {
1150    use super::*;
1151    use std::sync::atomic::{AtomicUsize, Ordering};
1152
1153    #[test]
1154    fn f32_to_bytes_roundtrip() {
1155        let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1156        let bytes = f32_to_bytes(&input);
1157        assert_eq!(bytes.len(), input.len() * 4);
1158        let out = bytes_to_f32(&bytes);
1159        assert_eq!(out, input);
1160    }
1161
1162    #[test]
1163    fn validate_dim_rejects_divergent_vectors() {
1164        // G42/C5 acceptance criterion: a divergent vector MUST fail —
1165        // never be silently normalised.
1166        let dim = crate::constants::embedding_dim();
1167        let long = vec![0.0; dim + 10];
1168        assert!(validate_dim(long).is_err(), "longer vector must error");
1169        let short = vec![0.0; dim.saturating_sub(1).max(1)];
1170        assert!(validate_dim(short).is_err(), "shorter vector must error");
1171        let exact = vec![0.0; dim];
1172        assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1173    }
1174
1175    #[test]
1176    fn embedding_dim_matches_constants_source() {
1177        assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1178    }
1179
1180    #[test]
1181    fn build_batches_preserves_global_indices() {
1182        let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1183        let batches = build_batches(&texts, 4);
1184        assert_eq!(batches.len(), 3);
1185        assert_eq!(batches[0].len(), 4);
1186        assert_eq!(batches[2].len(), 2);
1187        assert_eq!(batches[2][1].0, 9);
1188        assert_eq!(batches[2][1].1, "t9");
1189    }
1190
1191    #[test]
1192    fn effective_permits_clamps_to_bounds() {
1193        assert!(effective_permits(0) >= 1);
1194        assert!(effective_permits(1000) <= 32);
1195    }
1196
1197    fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1198        (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1199    }
1200
1201    fn dummy_vec(dim: usize) -> Vec<f32> {
1202        vec![0.0; dim]
1203    }
1204
1205    /// G42 acceptance criterion: with N permits the measured peak of
1206    /// concurrent workers NEVER exceeds N, even with 10x more batches.
1207    #[test]
1208    fn concurrency_peak_never_exceeds_permits() {
1209        let permits = 4usize;
1210        let batches = test_batches(permits * 10);
1211        let dim = crate::constants::embedding_dim();
1212        let current = Arc::new(AtomicUsize::new(0));
1213        let peak = Arc::new(AtomicUsize::new(0));
1214
1215        let current_c = Arc::clone(&current);
1216        let peak_c = Arc::clone(&peak);
1217        let work = move |batch: Vec<(usize, String)>| {
1218            let current = Arc::clone(&current_c);
1219            let peak = Arc::clone(&peak_c);
1220            async move {
1221                let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1222                peak.fetch_max(now, Ordering::SeqCst);
1223                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1224                current.fetch_sub(1, Ordering::SeqCst);
1225                Ok(batch
1226                    .into_iter()
1227                    .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1228                    .collect())
1229            }
1230        };
1231
1232        let mut delivered = 0usize;
1233        let rt = tokio::runtime::Builder::new_multi_thread()
1234            .worker_threads(4)
1235            .enable_all()
1236            .build()
1237            .expect("test runtime");
1238        rt.block_on(run_bounded(
1239            batches,
1240            permits,
1241            dim,
1242            CancellationToken::new(),
1243            work,
1244            &mut |_idx, _v| {
1245                delivered += 1;
1246                Ok(())
1247            },
1248        ))
1249        .expect("fan-out must succeed");
1250
1251        assert_eq!(delivered, permits * 10, "every item must be delivered");
1252        assert!(
1253            peak.load(Ordering::SeqCst) <= permits,
1254            "peak concurrency {} exceeded permits {permits}",
1255            peak.load(Ordering::SeqCst)
1256        );
1257    }
1258
1259    /// G42 acceptance criterion: a panicking task returns its permit via
1260    /// RAII and surfaces as JoinError::is_panic, not a hang.
1261    #[test]
1262    fn panicking_task_returns_permit_and_surfaces_error() {
1263        let permits = 2usize;
1264        let batches = test_batches(4);
1265        let dim = crate::constants::embedding_dim();
1266
1267        let work = move |batch: Vec<(usize, String)>| async move {
1268            if batch[0].0 == 1 {
1269                panic!("intentional test panic");
1270            }
1271            Ok(batch
1272                .into_iter()
1273                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1274                .collect())
1275        };
1276
1277        let rt = tokio::runtime::Builder::new_multi_thread()
1278            .worker_threads(2)
1279            .enable_all()
1280            .build()
1281            .expect("test runtime");
1282        let result = rt.block_on(run_bounded(
1283            batches,
1284            permits,
1285            dim,
1286            CancellationToken::new(),
1287            work,
1288            &mut |_idx, _v| Ok(()),
1289        ));
1290
1291        let err = result.expect_err("panic must surface as an error");
1292        assert!(
1293            err.to_string().contains("panicked"),
1294            "error must mention the panic: {err}"
1295        );
1296    }
1297
1298    /// G42 acceptance criterion: cancellation aborts in-flight work and
1299    /// the fan-out terminates within the shutdown timeout.
1300    #[test]
1301    fn cancellation_terminates_fan_out_quickly() {
1302        let permits = 2usize;
1303        let batches = test_batches(8);
1304        let dim = crate::constants::embedding_dim();
1305        let token = CancellationToken::new();
1306
1307        let work = move |batch: Vec<(usize, String)>| async move {
1308            // Long enough that only cancellation can finish the test fast.
1309            tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1310            Ok(batch
1311                .into_iter()
1312                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1313                .collect())
1314        };
1315
1316        let rt = tokio::runtime::Builder::new_multi_thread()
1317            .worker_threads(2)
1318            .enable_all()
1319            .build()
1320            .expect("test runtime");
1321        let cancel = token.clone();
1322        let start = std::time::Instant::now();
1323        let result = rt.block_on(async move {
1324            tokio::spawn(async move {
1325                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1326                cancel.cancel();
1327            });
1328            run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1329        });
1330
1331        assert!(result.is_err(), "cancelled fan-out must report an error");
1332        assert!(
1333            start.elapsed() < std::time::Duration::from_secs(10),
1334            "graceful shutdown must finish well under the work duration"
1335        );
1336    }
1337
1338    /// G42 acceptance criterion: a divergent dim coming out of the work
1339    /// stage fails the fan-out instead of being silently accepted.
1340    #[test]
1341    fn fan_out_rejects_divergent_dim() {
1342        let permits = 2usize;
1343        let batches = test_batches(2);
1344        let dim = crate::constants::embedding_dim();
1345
1346        let work = move |batch: Vec<(usize, String)>| async move {
1347            Ok(batch
1348                .into_iter()
1349                .map(|(i, _)| (i, vec![0.0f32; 3]))
1350                .collect::<Vec<(usize, Vec<f32>)>>())
1351        };
1352
1353        let rt = tokio::runtime::Builder::new_multi_thread()
1354            .worker_threads(2)
1355            .enable_all()
1356            .build()
1357            .expect("test runtime");
1358        let result = rt.block_on(run_bounded(
1359            batches,
1360            permits,
1361            dim,
1362            CancellationToken::new(),
1363            work,
1364            &mut |_idx, _v| Ok(()),
1365        ));
1366
1367        let err = result.expect_err("divergent dim must fail the fan-out");
1368        assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1369    }
1370
1371    /// G44: the calibration bases stay intact at the calibration dim.
1372    #[test]
1373    fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1374        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1375        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1376    }
1377
1378    /// G44: legacy 384-dim databases shrink to reliable batch sizes.
1379    #[test]
1380    fn adaptive_batch_dim384_shrinks() {
1381        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1382        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1383    }
1384
1385    /// G44: intermediate dims scale proportionally to the float budget.
1386    #[test]
1387    fn adaptive_batch_intermediate_dims() {
1388        assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1389        assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1390    }
1391
1392    /// G44: dims below the calibration dim never exceed the base.
1393    #[test]
1394    fn adaptive_batch_small_dim_clamps_to_base() {
1395        assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1396    }
1397
1398    /// G44: the function is total — no division by zero, no clamp panic.
1399    #[test]
1400    fn adaptive_batch_total_function() {
1401        assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1402        assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1403        assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1404    }
1405
1406    /// G44 end-to-end: the public wrappers follow the env-dim override.
1407    #[test]
1408    #[serial_test::serial(env)]
1409    fn adaptive_wrappers_follow_env_dim() {
1410        std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1411        let chunk = chunk_embed_batch_size();
1412        let entity = entity_embed_batch_size();
1413        std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1414        crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1415        assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1416        assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1417    }
1418
1419    // ---------------------------------------------------------------
1420    // G58/S1: FallbackReason + try_embed_query_with_fallback tests
1421    // ---------------------------------------------------------------
1422
1423    /// Display impl covers all three variants without panicking.
1424    #[test]
1425    fn fallback_reason_display_does_not_panic() {
1426        let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1427        let _ = FallbackReason::Cancelled.to_string();
1428        let _ = FallbackReason::Timeout {
1429            operation: "embed_query".into(),
1430            duration_secs: 30,
1431        }
1432        .to_string();
1433    }
1434
1435    /// FallbackReason is PartialEq — used in test assertions to verify
1436    /// the mapping rules.
1437    #[test]
1438    fn fallback_reason_is_partial_eq() {
1439        assert_eq!(
1440            FallbackReason::EmbeddingFailed("a".into()),
1441            FallbackReason::EmbeddingFailed("a".into())
1442        );
1443        assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1444        assert_ne!(
1445            FallbackReason::EmbeddingFailed("a".into()),
1446            FallbackReason::EmbeddingFailed("b".into())
1447        );
1448        assert_ne!(
1449            FallbackReason::Cancelled,
1450            FallbackReason::Timeout {
1451                operation: "x".into(),
1452                duration_secs: 1
1453            }
1454        );
1455    }
1456
1457    /// Timeout variant preserves the operation name and duration from the
1458    /// original AppError::Timeout for observability.
1459    #[test]
1460    fn fallback_reason_timeout_preserves_fields() {
1461        let r = FallbackReason::Timeout {
1462            operation: "embed_query_local".into(),
1463            duration_secs: 300,
1464        };
1465        match r {
1466            FallbackReason::Timeout {
1467                operation,
1468                duration_secs,
1469            } => {
1470                assert_eq!(operation, "embed_query_local");
1471                assert_eq!(duration_secs, 300);
1472            }
1473            other => panic!("expected Timeout, got {other:?}"),
1474        }
1475    }
1476
1477    /// try_embed_query_with_fallback surfaces an EmbeddingFailed variant
1478    /// when the LLM subprocess errors. Uses a path that surely does not
1479    /// contain any embedder configuration (the binary is invoked as
1480    /// `codex` / `claude` via PATH which, in tests, defaults to nothing
1481    /// in scope, so `LlmEmbedding::detect_available()` returns Err).
1482    #[test]
1483    #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1484    fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1485        // Pointing at a models dir that does not exist forces the embedder
1486        // init to fail; the error is mapped to EmbeddingFailed.
1487        let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1488        let result = try_embed_query_with_fallback(bogus, "hello world");
1489        match result {
1490            Err(FallbackReason::EmbeddingFailed(msg)) => {
1491                // The original error must survive in the message for ops triage.
1492                assert!(!msg.is_empty(), "fallback message must not be empty");
1493            }
1494            Err(FallbackReason::Cancelled) => {
1495                panic!("expected EmbeddingFailed, got Cancelled");
1496            }
1497            Err(FallbackReason::Timeout { .. }) => {
1498                panic!("expected EmbeddingFailed, got Timeout");
1499            }
1500            Err(FallbackReason::SlotExhausted) => {
1501                panic!("expected EmbeddingFailed, got SlotExhausted");
1502            }
1503            Err(FallbackReason::OAuthQuota { .. }) => {
1504                panic!("expected EmbeddingFailed, got OAuthQuota");
1505            }
1506            Err(FallbackReason::BackendMismatch { .. }) => {
1507                panic!("expected EmbeddingFailed, got BackendMismatch");
1508            }
1509            Err(FallbackReason::DimZero) => {
1510                panic!("expected EmbeddingFailed, got DimZero");
1511            }
1512            Ok(_) => {
1513                panic!("expected an error, got Ok — embedder must fail for bogus path");
1514            }
1515        }
1516    }
1517
1518    // G56: entity embed cache — unit tests
1519    #[test]
1520    fn g56_entity_cache_key_is_stable_and_distinct() {
1521        let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1522        let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1523        let k3 = entity_cache_key("codex:default", "claude-code");
1524        let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1525        assert_eq!(k1, k2, "same model+text must hash identically");
1526        assert_ne!(k1, k3, "different text must hash differently");
1527        assert_ne!(k1, k4, "different model must hash differently");
1528    }
1529
1530    #[test]
1531    fn g56_entity_embed_cache_stats_hit_rate() {
1532        let zero = EmbedCacheStats::default();
1533        assert_eq!(zero.hit_rate(), 0.0);
1534        let half = EmbedCacheStats {
1535            requested: 4,
1536            hits: 2,
1537            misses: 2,
1538        };
1539        assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1540        let all = EmbedCacheStats {
1541            requested: 7,
1542            hits: 7,
1543            misses: 0,
1544        };
1545        assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1546    }
1547
1548    #[test]
1549    fn g56_entity_embed_cache_populates_and_hits() {
1550        // Manually populate the cache: bypasses the LLM by writing a
1551        // known vector under a chosen (model, text) key, then verifies
1552        // the cache is consulted before any LLM call would happen.
1553        let cache = entity_embed_cache();
1554        let model = "test-model";
1555        let text = "sqlite-graphrag";
1556        let key = entity_cache_key(model, text);
1557        let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1558        cache.lock().insert(key, Arc::clone(&stored));
1559        let guard = cache.lock();
1560        let hit = guard.get(&key).expect("cache must return stored value");
1561        assert_eq!(hit.len(), crate::constants::embedding_dim());
1562        assert!((hit[0] - 0.42).abs() < 1e-6);
1563    }
1564
1565    #[test]
1566    fn g56_empty_texts_short_circuits_with_zero_stats() {
1567        // Cannot call embed_entity_texts_cached without an LLM on PATH,
1568        // so we only verify the empty-input contract via the stats struct.
1569        let stats = EmbedCacheStats::default();
1570        assert_eq!(stats.requested, 0);
1571        assert_eq!(stats.hits, 0);
1572        assert_eq!(stats.misses, 0);
1573        assert_eq!(stats.hit_rate(), 0.0);
1574    }
1575}
1576
1577// =============================================================================
1578// v1.0.82 (GAP-005) — embed_with_fallback tests
1579// =============================================================================
1580#[cfg(test)]
1581mod embed_with_fallback_tests {
1582    use super::*;
1583    use crate::llm::exit_code_hints::LlmBackendError;
1584
1585    #[test]
1586    fn none_backend_returns_empty_vector_without_calling_llm() {
1587        // The `None` backend short-circuits to `Ok(vec![])` without
1588        // touching the LLM at all. This is the signal the caller uses
1589        // to insert a `pending_embeddings` row.
1590        let (v, kind) = embed_via_backend(
1591            std::path::Path::new("/nonexistent"),
1592            "any text",
1593            &LlmBackendKind::None,
1594        )
1595        .expect("None backend never fails");
1596        assert!(v.is_empty());
1597        assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1598    }
1599
1600    #[test]
1601    fn empty_chain_defaults_to_codex_claude_none() {
1602        // Internal invariant: the default chain order is the v1.0.81
1603        // implicit order (codex first, then claude, then None as
1604        // graceful-degradation fallback).
1605        let defaults = [
1606            LlmBackendKind::Codex,
1607            LlmBackendKind::Claude,
1608            LlmBackendKind::None,
1609        ];
1610
1611        // ---------------------------------------------------------------
1612        // ADR-0042: as_str + reason_code unit tests
1613        // ---------------------------------------------------------------
1614
1615        #[allow(dead_code)]
1616        fn llm_backend_kind_as_str_is_stable() {
1617            assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1618            assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1619            assert_eq!(LlmBackendKind::None.as_str(), "none");
1620        }
1621
1622        #[allow(dead_code)]
1623        fn fallback_reason_reason_code_is_stable() {
1624            assert_eq!(
1625                FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1626                "embedding_failed"
1627            );
1628            assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1629            assert_eq!(
1630                FallbackReason::Timeout {
1631                    operation: "embed_query".into(),
1632                    duration_secs: 30
1633                }
1634                .reason_code(),
1635                "timeout"
1636            );
1637        }
1638        assert_eq!(defaults.len(), 3);
1639    }
1640
1641    #[test]
1642    fn embed_with_fallback_succeeds_via_none_when_chain_exhausts() {
1643        // The chain [codex, claude, none] always succeeds via the
1644        // `None` graceful-degradation tail: when codex+claude fail,
1645        // None returns Ok(vec![]) so the caller can persist with a
1646        // pending_embeddings row. This is the v1.0.81-implicit
1647        // behaviour and the default for `--llm-fallback` chains.
1648        //
1649        // The test cannot easily simulate "codex and claude both fail"
1650        // because `embed_passage_local` succeeds in the CI environment
1651        // (mock LLM is on PATH). Instead we verify the chain-exhaustion
1652        // contract: when the chain reaches `None`, the function
1653        // returns Ok(empty). This is exercised in production by the
1654        // `embedding` retry subcommand and by the
1655        // `--skip-embedding-on-failure` flag.
1656        let chain = vec![LlmBackendKind::None];
1657        let v = embed_with_fallback(
1658            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1659            "hello",
1660            &chain,
1661            false,
1662        )
1663        .expect("chain ending in None must always succeed");
1664        assert!(v.0.is_empty(), "vector must be empty");
1665        assert_eq!(v.1, LlmBackendKind::None);
1666    }
1667    #[test]
1668    fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
1669        // skip_on_failure=true + a chain of only `None` returns Ok(vec![])
1670        // because the None short-circuit always succeeds. This is the
1671        // canonical contract: skip_on_failure is a no-op when None is
1672        // the tail because None already provides graceful degradation.
1673        let chain = vec![LlmBackendKind::None];
1674        let v = embed_with_fallback(
1675            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1676            "hello",
1677            &chain,
1678            true,
1679        )
1680        .expect("None chain is always Ok");
1681        assert!(v.0.is_empty(), "vector must be empty");
1682        assert_eq!(v.1, LlmBackendKind::None);
1683    }
1684    #[allow(dead_code)]
1685    fn llm_backend_error_no_backends_default_message() {
1686        // The fallback chain exhaustion error must mention
1687        // in its hint so the operator knows the remediation.
1688        let e = LlmBackendError::NoBackendsAvailable;
1689        let h = e.hint();
1690        assert!(h.contains("--llm-fallback"));
1691    }
1692
1693    #[test]
1694    fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
1695        let e = LlmBackendError::NonZeroExit {
1696            exit_code: Some(137),
1697            signal: Some(9),
1698            stdout_tail: "out".into(),
1699            stderr_tail: "OOM killed".into(),
1700            binary: "codex".into(),
1701            hint: "OOM".into(),
1702        };
1703        let s = e.to_string();
1704        assert!(s.contains("codex"));
1705        assert!(s.contains("OOM killed"));
1706        assert!(s.contains("signal 9") || s.contains("exit 137"));
1707    }
1708}