Skip to main content

sqlite_graphrag/
embedder.rs

1//! Embedding generation for the GraphRAG memory.
2//!
3//! v1.0.76: the default build is **LLM-only** — the binary does NOT bundle
4//! fastembed / ort / ndarray / tokenizers. All embeddings are produced
5//! by a headless invocation of `claude code` or `codex` (OAuth, no MCP,
6//! no hooks) and stored as a BLOB in `memory_embeddings(memory_id, embedding,
7//! source)`. Vector similarity is computed in pure Rust at query time.
8//!
9//! # Workload classification (G42/S3, BLOCK 1 — MANDATORY)
10//!
11//! LLM embedding is **I/O-bound + subprocess-bound**: each call waits
12//! 5-60s on a network round-trip through a headless `claude -p` /
13//! `codex exec` subprocess while the local CPU stays idle. Concurrency
14//! therefore uses **tokio** (async I/O concurrency) and NEVER rayon
15//! (reserved for CPU-bound work).
16//!
17//! # Permit formula (G42/S3, BLOCO 2)
18//!
19//! ```text
20//! permits = clamp(--llm-parallelism, 1, 32)
21//!           .min(available_parallelism())
22//!           .min(available_ram_mb * 0.5 / LLM_WORKER_RSS_MB)
23//! ```
24//!
25//! `LLM_WORKER_RSS_MB = 350` (`crate::constants`): `claude -p` and
26//! `codex exec` are node processes with a typical Maximum RSS of
27//! 200-400 MB (measured via `/usr/bin/time -l` on macOS /
28//! `/usr/bin/time -v` on Linux), so the RAM bound is pertinent.
29//!
30//! # Locking contract (G42/A3 fix)
31//!
32//! The process-wide `Mutex<LlmEmbedding>` protects ONLY the cheap clone
33//! of the client configuration (flavour + binary path + model + shared
34//! schema tempfiles). It is NEVER held across network I/O — the
35//! v1.0.76-v1.0.78 `flush_group` held it for the whole sequential
36//! embedding loop, which is why `--llm-parallelism 8` measured an
37//! effective parallelism of 1.
38
39use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49/// Process-wide LLM-embedding client behind a .
50///
51/// The lock guards configuration cloning only (see module docs); the
52/// actual LLM I/O happens on clones, outside the lock.
53///
54/// ADR-0042 / GAP-002: process-wide Claude-backed LLM-embedding client
55/// behind a `Mutex`. Distinct from `EMBEDDER` so the Claude path of
56/// `embed_via_backend` no longer re-probes PATH via `detect_available`
57/// (the v1.0.82 bug where requesting Claude could resolve to Codex).
58static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62/// v1.0.95 (ADR-0054): process-wide OpenRouter chat-completions client for
63/// the `enrich` JUDGE. Distinct from `OPENROUTER_CLIENT` (embeddings) because
64/// the chat client binds a text model, not an embedding model.
65static OPENROUTER_CHAT_CLIENT: OnceLock<crate::chat_api::OpenRouterChatClient> = OnceLock::new();
66
67/// v1.0.93: check whether the OpenRouter client has been initialised.
68pub fn is_openrouter_initialized() -> bool {
69    OPENROUTER_CLIENT.get().is_some()
70}
71static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
72
73/// Process-wide multi-thread tokio runtime for embedding I/O.
74///
75/// G42/A2 fix: v1.0.76-v1.0.78 built a current-thread runtime PER CALL.
76/// One runtime per process amortises the setup and hosts the bounded
77/// fan-out of `embed_texts_parallel`.
78static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
79
80/// Calibration base: chunk (long-text) batch size per LLM call at the
81/// calibration dimensionality (G42/S2). Use [`chunk_embed_batch_size`]
82/// for the dim-adaptive value (G44).
83pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
84
85/// Calibration base: entity-name (short-text) batch size per LLM call at
86/// the calibration dimensionality (G42/S2). Use [`entity_embed_batch_size`]
87/// for the dim-adaptive value (G44).
88pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
89
90/// Dimensionality the batch bases above were calibrated against (G44).
91pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
92
93/// G44: scales a calibration-base batch size to the active dimensionality,
94/// keeping the float budget per LLM call constant (~512 floats for chunks,
95/// ~1600 for entity names — the budgets empirically validated at dim 64).
96/// Fixed batches of 8 at 384 dims asked for ~3072 floats per response:
97/// claude returned partial coverage (3 of 8 items, caught by the G42/C5
98/// check) and codex timed out at 300s. `base.max(1)` keeps the function
99/// total — `clamp` panics when the upper bound is below the lower one.
100fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
101    let base = base.max(1);
102    (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
103}
104
105/// Dim-adaptive batch size for chunk (long-text) embedding calls (G44).
106pub fn chunk_embed_batch_size() -> usize {
107    let dim = crate::constants::embedding_dim();
108    let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
109    tracing::debug!(
110        dim,
111        base = CHUNK_EMBED_BATCH_SIZE,
112        batch,
113        "adaptive chunk batch size (G44)"
114    );
115    batch
116}
117
118/// Dim-adaptive batch size for entity-name (short-text) embedding calls (G44).
119pub fn entity_embed_batch_size() -> usize {
120    let dim = crate::constants::embedding_dim();
121    let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
122    tracing::debug!(
123        dim,
124        base = ENTITY_EMBED_BATCH_SIZE,
125        batch,
126        "adaptive entity batch size (G44)"
127    );
128    batch
129}
130
131/// Returns the process-wide multi-thread runtime, building it on first use.
132pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
133    if let Some(rt) = RUNTIME.get() {
134        return Ok(rt);
135    }
136    let rt = tokio::runtime::Builder::new_multi_thread()
137        .worker_threads(2)
138        .enable_all()
139        .build()
140        .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
141    let _ = RUNTIME.set(rt);
142    Ok(RUNTIME.get().expect("RUNTIME initialised above"))
143}
144
145/// Initialises the LLM-embedding client on first use and returns it.
146pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
147    if let Some(e) = EMBEDDER.get() {
148        return Ok(e);
149    }
150    let backend = LlmEmbedding::detect_available()?;
151    let _ = EMBEDDER.set(Mutex::new(backend));
152    Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
153}
154
155/// ADR-0042 / GAP-002: returns the process-wide Claude embedder, lazily
156/// initialising it on first use. Binary and model overrides come from
157/// the explicit arguments; `None` falls back to PATH/env defaults via
158/// the builder.
159pub fn get_claude_embedder(
160    claude_binary: Option<&Path>,
161    claude_model: Option<&str>,
162) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
163    if let Some(e) = CLAUDE_EMBEDDER.get() {
164        return Ok(e);
165    }
166    let mut builder = LlmEmbedding::with_claude_builder();
167    if let Some(b) = claude_binary {
168        builder = builder.override_binary(b.to_path_buf());
169    }
170    if let Some(m) = claude_model {
171        builder = builder.override_model(m.to_string());
172    }
173    let backend = builder.build()?;
174    let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
175    Ok(CLAUDE_EMBEDDER
176        .get()
177        .expect("CLAUDE_EMBEDDER initialised above"))
178}
179
180/// GAP-OPENCODE-001 / v1.0.90: returns the process-wide OpenCode embedder,
181/// lazily initialising it on first use. Binary and model overrides come
182/// from the explicit arguments; `None` falls back to PATH/env defaults via
183/// the builder.
184pub fn get_opencode_embedder(
185    opencode_binary: Option<&Path>,
186    opencode_model: Option<&str>,
187) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
188    if let Some(e) = OPENCODE_EMBEDDER.get() {
189        return Ok(e);
190    }
191    let mut builder = LlmEmbedding::with_opencode_builder();
192    if let Some(b) = opencode_binary {
193        builder = builder.override_binary(b.to_path_buf());
194    }
195    if let Some(m) = opencode_model {
196        builder = builder.override_model(m.to_string());
197    }
198    let backend = builder.build()?;
199    let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
200    Ok(OPENCODE_EMBEDDER
201        .get()
202        .expect("OPENCODE_EMBEDDER initialised above"))
203}
204
205pub fn get_openrouter_embedder(
206    api_key: secrecy::SecretBox<String>,
207    model: &str,
208    dim: usize,
209) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
210    if let Some(c) = OPENROUTER_CLIENT.get() {
211        return Ok(c);
212    }
213    let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
214    let _ = OPENROUTER_CLIENT.set(client);
215    Ok(OPENROUTER_CLIENT
216        .get()
217        .expect("OPENROUTER_CLIENT initialised above"))
218}
219
220/// v1.0.95 (ADR-0054): initialises the process-wide OpenRouter chat client on
221/// first use and returns it. `model` is the text model the enrich JUDGE will
222/// call (no default; the caller validates presence upfront).
223pub fn get_openrouter_chat_client(
224    api_key: secrecy::SecretBox<String>,
225    model: &str,
226    timeout_secs: u64,
227) -> Result<&'static crate::chat_api::OpenRouterChatClient, AppError> {
228    if let Some(c) = OPENROUTER_CHAT_CLIENT.get() {
229        return Ok(c);
230    }
231    let client =
232        crate::chat_api::OpenRouterChatClient::new(api_key, model.to_string(), timeout_secs)?;
233    let _ = OPENROUTER_CHAT_CLIENT.set(client);
234    Ok(OPENROUTER_CHAT_CLIENT
235        .get()
236        .expect("OPENROUTER_CHAT_CLIENT initialised above"))
237}
238
239/// v1.0.95: returns the process-wide OpenRouter chat client if it has already
240/// been initialised via [`get_openrouter_chat_client`]. Used by the enrich
241/// JUDGE dispatch, which initialises the singleton once at startup and then
242/// fetches it per item without re-threading the API key.
243pub fn openrouter_chat_client() -> Option<&'static crate::chat_api::OpenRouterChatClient> {
244    OPENROUTER_CHAT_CLIENT.get()
245}
246
247/// ADR-0042 / GAP-002: route a single passage through the Claude
248/// embedder. Used by the Claude arm of `embed_via_backend` so the
249/// fallback chain stops treating Claude as a synonym for codex.
250pub fn embed_via_claude_local(
251    _models_dir: &Path,
252    text: &str,
253    claude_binary: Option<&Path>,
254    claude_model: Option<&str>,
255) -> Result<Vec<f32>, AppError> {
256    let _slot_guard = acquire_llm_slot_for_embedding()?;
257    let embedder = get_claude_embedder(claude_binary, claude_model)?;
258    embed_passage(embedder, text)
259}
260
261/// BUG-003 / v1.0.85: split of  that also
262/// reports the resolved []. Always  because
263/// this path constructs a Claude-flavoured embedder via
264///  (no PATH probe, no silent substitution).
265pub fn embed_via_claude_local_resolved(
266    _models_dir: &Path,
267    text: &str,
268    claude_binary: Option<&Path>,
269    claude_model: Option<&str>,
270) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
271    let _slot_guard = acquire_llm_slot_for_embedding()?;
272    let embedder = get_claude_embedder(claude_binary, claude_model)?;
273    let v = embed_passage(embedder, text)?;
274    Ok((v, LlmBackendKind::Claude))
275}
276
277/// GAP-OPENCODE-001 / v1.0.90: route a single passage through the OpenCode
278/// embedder, reporting the resolved [`LlmBackendKind::Opencode`]. Constructs
279/// an OpenCode-flavoured embedder via `with_opencode_builder` (no PATH probe,
280/// no silent substitution).
281pub fn embed_via_opencode_local_resolved(
282    _models_dir: &Path,
283    text: &str,
284    opencode_binary: Option<&Path>,
285    opencode_model: Option<&str>,
286) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
287    let _slot_guard = acquire_llm_slot_for_embedding()?;
288    let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
289    let v = embed_passage(embedder, text)?;
290    Ok((v, LlmBackendKind::Opencode))
291}
292/// Clones the embedding-client configuration. The lock is held only for
293/// the duration of the clone — NEVER across I/O (G42/A3).
294fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
295    embedder.lock().clone()
296}
297
298/// Embeds a single passage for storage. Delegates to the configured LLM
299/// headless (claude code / codex). Returns a vector of the active
300/// dimensionality.
301pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
302    let client = clone_client(embedder);
303    let result = client.embed_passage(text)?;
304    validate_dim(result)
305}
306
307/// Embeds a single query for similarity search. Same model and dim as
308/// `embed_passage`; the only difference is the LLM-side prompt prefix
309/// that the headless invocation uses to disambiguate.
310pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
311    let client = clone_client(embedder);
312    let result = client.embed_query(text)?;
313    validate_dim(result)
314}
315
316/// Embeds a batch of passages with token-count-aware batching.
317///
318/// Kept for API compatibility; since v1.0.79 it routes through the
319/// bounded parallel fan-out with conservative defaults.
320pub fn embed_passages_controlled(
321    embedder: &Mutex<LlmEmbedding>,
322    texts: &[&str],
323    _token_counts: &[usize],
324) -> Result<Vec<Vec<f32>>, AppError> {
325    if texts.is_empty() {
326        return Ok(Vec::new());
327    }
328    let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
329    embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
330}
331
332pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
333    let _slot_guard = acquire_llm_slot_for_embedding()?;
334    let embedder = get_embedder(models_dir)?;
335    embed_passage(embedder, text)
336}
337
338/// v1.0.89 (BUG-SKIP-EMBED): reads `SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE`
339/// env var (set by `--skip-embedding-on-failure` via main.rs propagation).
340/// Returns `true` when the user opted to persist with NULL embedding on failure.
341pub fn should_skip_embedding_on_failure() -> bool {
342    matches!(
343        std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
344        Ok("1") | Ok("true")
345    )
346}
347
348/// v1.0.89 (BUG-SKIP-EMBED + GAP-EMBED-PROPAGATION): embed a passage
349/// honouring both `--llm-backend` and `--skip-embedding-on-failure`.
350///
351/// On success returns `Ok(Some(vec))`. On failure:
352/// - if `--skip-embedding-on-failure` is active, logs a warning and returns `Ok(None)`
353/// - otherwise propagates the error (exit 11)
354pub fn embed_passage_or_skip(
355    models_dir: &Path,
356    text: &str,
357    choice: Option<crate::cli::LlmBackendChoice>,
358) -> Result<Option<Vec<f32>>, AppError> {
359    match embed_passage_with_choice(models_dir, text, choice) {
360        Ok((v, _backend)) => Ok(Some(v)),
361        Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
362        Err(e) => {
363            if should_skip_embedding_on_failure() {
364                tracing::warn!(
365                    error = %e,
366                    "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
367                );
368                Ok(None)
369            } else {
370                Err(e)
371            }
372        }
373    }
374}
375
376/// BUG-003 / v1.0.85: split of `embed_passage_local` that reports the
377/// resolved [`LlmBackendKind`] based on the ACTUAL
378/// [`LlmEmbedding::flavour`] of the embedder constructed. When
379/// `LlmEmbedding::detect_available` substitutes claude for a missing
380/// codex, the operator sees the truth in `envelope.backend_invoked`.
381pub fn embed_passage_local_resolved(
382    models_dir: &Path,
383    text: &str,
384) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
385    let _slot_guard = acquire_llm_slot_for_embedding()?;
386    let embedder = get_embedder(models_dir)?;
387    let v = embed_passage(embedder, text)?;
388    let kind = match embedder.lock().flavour() {
389        crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
390        crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
391        crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
392    };
393    Ok((v, kind))
394}
395
396pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
397    let _slot_guard = acquire_llm_slot_for_embedding()?;
398    let embedder = get_embedder(models_dir)?;
399    embed_query(embedder, text)
400}
401
402// =============================================================================
403// v1.0.82 (GAP-003): wrappers que aceitam a escolha do CLI
404// (`crate::cli::LlmBackendChoice`) e a traduzem em uma chain para
405// `embed_with_fallback`. Centralizam a propagação do flag `--llm-backend`
406// nos 6 comandos que produzem embedding (`remember`, `edit`, `ingest`,
407// `enrich`, `recall`, `hybrid-search`).
408// =============================================================================
409
410/// Embed a single passage using the LLM backend selected by the user via
411/// `--llm-backend`. Routes to `embed_with_fallback` so failures fall
412/// through to the next backend in the chain before giving up.
413///
414/// When `choice` is `None` (e.g. a sub-command that does not yet
415/// expose the flag), behaviour matches `embed_passage_local` — the
416/// active embedder from `LlmEmbedding::detect_available` decides the
417/// backend.
418pub fn embed_passage_with_choice(
419    models_dir: &Path,
420    text: &str,
421    choice: Option<crate::cli::LlmBackendChoice>,
422) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
423    let _slot_guard = acquire_llm_slot_for_embedding()?;
424    match choice {
425        None => {
426            let embedder = get_embedder(models_dir)?;
427            embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
428        }
429        Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
430    }
431}
432
433/// v1.0.93: embedding with `EmbeddingBackendChoice` awareness. When the
434/// embedding backend is `Openrouter` or `Auto` with a live client, the
435/// chain prepends `OpenRouter` before the LLM subprocess backends.
436pub fn embed_passage_with_embedding_choice(
437    models_dir: &Path,
438    text: &str,
439    embedding_backend: crate::cli::EmbeddingBackendChoice,
440    llm_backend: crate::cli::LlmBackendChoice,
441) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
442    let _slot_guard = acquire_llm_slot_for_embedding()?;
443    let chain = embedding_backend.to_chain(llm_backend);
444    embed_with_fallback(models_dir, text, &chain, false)
445}
446
447/// failure, returns a structured `FallbackReason` so the caller can
448/// surface `vec_degraded` instead of a hard exit 11.
449///
450/// `None` matches the legacy `try_embed_query_with_fallback` path
451/// (uses the active embedder without an explicit chain).
452pub fn try_embed_query_with_choice(
453    models_dir: &Path,
454    text: &str,
455    choice: Option<crate::cli::LlmBackendChoice>,
456) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
457    match embed_passage_with_choice(models_dir, text, choice) {
458        // GAP-004 / v1.0.85.1: when the chain terminates on
459        //  (i.e. user passed
460        // or every preceding backend failed),  returns
461        //  instead of an error. Without this guard the
462        // empty vector would propagate to  which
463        // aborts with exit 11 ("embedding has 0 dims, expected 64").
464        // The caller's contract is to surface a typed
465        // so  and  can route to FTS5-puro via
466        // the existing  /  envelope.
467        // Intercept the empty-vector success path and surface it as
468        //  (introduced at v1.0.85 / ADR-0043
469        // for the symmetric LLM-returned-zero-dim case).
470        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
471        Ok((v, backend)) => Ok((v, backend)),
472        Err(e) => Err(classify_embedding_error(e)),
473    }
474}
475/// v1.0.93 (GAP-OR-INGEST): query embedding with `EmbeddingBackendChoice`
476/// awareness. Mirrors `try_embed_query_with_choice` but routes through
477/// `embed_passage_with_embedding_choice` so OpenRouter API is used when
478/// configured.
479pub fn try_embed_query_with_embedding_choice(
480    models_dir: &Path,
481    text: &str,
482    embedding_backend: crate::cli::EmbeddingBackendChoice,
483    llm_backend: crate::cli::LlmBackendChoice,
484) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
485    match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
486        Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
487        Ok((v, backend)) => Ok((v, backend)),
488        Err(e) => Err(classify_embedding_error(e)),
489    }
490}
491
492/// call. Reads the max-concurrency from
493/// `SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY` (default derived from
494/// `LLM_WORKER_RSS_MB` and available memory), and the wait timeout
495/// from `SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS` (default 30s).
496///
497/// Returns `Ok(guard)` for happy path, `AppError::LockBusy` (exit 75)
498/// when no slot is available within the wait window, and
499/// `AppError::Validation` when the concurrency is 0.
500///
501/// The `LLM_SLOT_NO_WAIT` env var (or its CLI flag equivalent) sets
502/// `wait_secs = 0` to fail fast in tests.
503fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
504    use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
505    let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
506        .ok()
507        .and_then(|s| s.parse::<u32>().ok())
508        .filter(|n| *n >= 1)
509        .unwrap_or_else(crate::llm_slots::default_max_concurrency);
510    let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
511        0
512    } else {
513        std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
514            .ok()
515            .and_then(|s| s.parse::<u64>().ok())
516            .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
517    };
518    let _ = LLM_WORKER_RSS_MB; // silence the unused import (used in default_max_concurrency)
519                               // GAP-003 / ADR-0043: when the slot semaphore is contended beyond the
520                               // backoff window (50 + 100 + 200 + 400 = 750ms total), return a
521                               // marker message that `classify_embedding_error` maps to
522                               // `FallbackReason::SlotExhausted` (discriminator `slot_exhausted`).
523                               // The window is shorter than the legacy 30s timeout, so the operator
524                               // observes FTS5-puro fallback quickly instead of after 30s of silence.
525    match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
526        Ok(guard) => Ok(guard),
527        Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
528            "slot exhausted: {e} (fall back to FTS5)"
529        ))),
530        Err(e) => Err(e),
531    }
532}
533/// GAP-004 (v1.0.88): typed classifier for embedding error messages.
534///
535/// Decomposes the legacy `AppError::Embedding(String)` payload into a
536/// small enum so the call sites can branch on the cause instead of
537/// repeating `msg.contains(...)` literals. The classification is purely
538/// lexical (case-insensitive substring match on the error message) — no
539/// I/O, no retries, no telemetry, deterministic and safe under
540/// `#[serial_test::serial(env)]`.
541///
542/// 6 variants cover the 5 known discriminators from v1.0.85 (ADR-0043)
543/// plus an `Unknown` fallback for messages that do not match any marker.
544#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545pub enum EmbeddingErrorKind {
546    /// OAuth token expired or absent; no backend can authenticate.
547    OAuth,
548    /// OAuth usage quota exhausted on the named backend.
549    Quota,
550    /// LLM slot semaphore exhausted after the backoff window.
551    SlotExhausted,
552    /// User-requested backend differs from the one that actually executed.
553    BackendMismatch,
554    /// Embedding returned a zero-dimensional vector (structural bug).
555    ZeroDimension,
556    /// Message did not match any of the 5 markers above.
557    Unknown,
558}
559
560impl EmbeddingErrorKind {
561    /// Classify an embedding error message into a typed kind.
562    ///
563    /// Order of checks matters: `OAuth` is matched before `Quota` because
564    /// both substrings can co-occur in the same message. `SlotExhausted`
565    /// is checked before `Quota` because the slot-sema path is more
566    /// specific (the LLM never even tried to authenticate). The checks
567    /// are case-insensitive so `OAuth` and `oauth` both classify to
568    /// `EmbeddingErrorKind::OAuth`.
569    pub fn classify(msg: &str) -> Self {
570        let m = msg.to_lowercase();
571        if m.contains("oauth") {
572            Self::OAuth
573        } else if m.contains("quota") {
574            Self::Quota
575        } else if m.contains("slot exhausted") {
576            Self::SlotExhausted
577        } else if m.contains("backend mismatch") {
578            Self::BackendMismatch
579        } else if m.contains("dim") && m.contains("zero") {
580            Self::ZeroDimension
581        } else {
582            Self::Unknown
583        }
584    }
585
586    /// Stable, machine-friendly discriminator code (lowercase, kebab-safe).
587    pub fn code(&self) -> &'static str {
588        match self {
589            Self::OAuth => "oauth",
590            Self::Quota => "quota",
591            Self::SlotExhausted => "slot-exhausted",
592            Self::BackendMismatch => "backend-mismatch",
593            Self::ZeroDimension => "zero-dimension",
594            Self::Unknown => "unknown",
595        }
596    }
597}
598
599impl std::fmt::Display for EmbeddingErrorKind {
600    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601        f.write_str(self.code())
602    }
603}
604
605/// G58/S1: reason an embedding call could not be completed and the caller
606/// must fall back to a non-vector retrieval path (FTS5 prefix + LIKE).
607///
608/// Returned by [`try_embed_query_with_fallback`] so the `recall` and
609/// `hybrid-search` handlers can surface a structured `vec_degraded` /
610/// `warning` envelope instead of a hard `AppError::Embedding` exit 11.
611#[derive(Debug, Clone, PartialEq)]
612pub enum FallbackReason {
613    /// The LLM subprocess failed (rate limit, OAuth contention, quota
614    /// exhausted, model unparsable response, divergent dim, etc.).
615    /// Carries the original error message for observability.
616    EmbeddingFailed(String),
617    /// The LLM slot semaphore was exhausted: 8+ concurrent LLM
618    /// subprocesses blocked the acquire beyond the backoff window
619    /// (50ms + 100ms + 200ms + 400ms = 750ms total). Resolved at v1.0.85
620    /// (GAP-003 / ADR-0043).
621    SlotExhausted,
622    /// OAuth usage quota exhausted on the named backend. The caller
623    /// should retry with an alternative backend (codex ↔ claude)
624    /// before falling back to FTS5-puro.
625    OAuthQuota { backend: &'static str },
626    /// The user requested a backend that differs from the one that
627    /// actually executed the embedding (legacy "synonym for codex"
628    /// bug from v1.0.83). Resolved at v1.0.84 (GAP-002).
629    BackendMismatch {
630        requested: &'static str,
631        resolved: &'static str,
632    },
633    /// The embedding returned a zero-dimensional vector, signalling a
634    /// structural bug (the LLM did not produce any floats). Distinct
635    /// from OAuthQuota (quota exhausted) and EmbeddingFailed
636    /// (subprocess error).
637    DimZero,
638    /// The embedding was cancelled by an external signal (SIGTERM, etc.).
639    Cancelled,
640    /// The embedding exceeded its time budget. Carries the operation name
641    /// and the elapsed seconds for diagnostic logging.
642    Timeout {
643        operation: String,
644        duration_secs: u64,
645    },
646}
647
648impl FallbackReason {
649    /// Stable, machine-friendly reason code used by JSON envelopes
650    /// (`vec_degraded_reason`). Mirrors the v1.0.84 contract extended
651    /// at v1.0.85 with 4 new variants (GAP-003 / ADR-0043).
652    pub fn reason_code(&self) -> &'static str {
653        match self {
654            Self::EmbeddingFailed(_) => "embedding_failed",
655            Self::SlotExhausted => "slot_exhausted",
656            Self::OAuthQuota { .. } => "oauth_quota",
657            Self::BackendMismatch { .. } => "backend_mismatch",
658            Self::DimZero => "dim_zero",
659            Self::Cancelled => "cancelled",
660            Self::Timeout { .. } => "timeout",
661        }
662    }
663}
664
665impl std::fmt::Display for FallbackReason {
666    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667        match self {
668            Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
669            Self::SlotExhausted => write!(
670                f,
671                "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
672            ),
673            Self::OAuthQuota { backend } => {
674                write!(f, "OAuth usage quota exhausted on backend '{backend}'")
675            }
676            Self::BackendMismatch {
677                requested,
678                resolved,
679            } => {
680                write!(
681                    f,
682                    "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
683                )
684            }
685            Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
686            Self::Cancelled => write!(f, "embedding cancelled by external signal"),
687            Self::Timeout {
688                operation,
689                duration_secs,
690            } => {
691                write!(
692                    f,
693                    "embedding timed out after {duration_secs}s during {operation}"
694                )
695            }
696        }
697    }
698}
699
700impl std::error::Error for FallbackReason {}
701
702/// G58/S1: try to embed a query, mapping any failure to a structured
703/// [`FallbackReason`] so callers can route to FTS5 + LIKE fallback instead
704/// of returning exit 11 to the user.
705///
706/// This is the bridge between the hard-fail `embed_query_local` (used by
707/// write paths where embedding failure aborts the operation) and the
708/// graceful-degradation contract of `recall` / `hybrid-search` in v1.0.80.
709pub fn try_embed_query_with_fallback(
710    models_dir: &Path,
711    query: &str,
712) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
713    match embed_query_local(models_dir, query) {
714        Ok(v) => Ok((v, LlmBackendKind::None)),
715        Err(e) => Err(classify_embedding_error(e)),
716    }
717}
718
719/// G58 / ADR-0043 (v1.0.85): deterministic fallback for `recall` and
720/// `hybrid-search`.
721///
722/// - On `OAuthQuota { backend }`, retry once with the alternative backend
723///   (codex ↔ claude) before giving up.
724/// - On `SlotExhausted`, sleep 750ms and retry once (gives the slot
725///   semaphore time to release a permit from a sibling subprocess).
726/// - On any other `FallbackReason`, return immediately (deterministic).
727pub fn try_embed_query_with_deterministic_fallback(
728    models_dir: &Path,
729    query: &str,
730    choice: Option<crate::cli::LlmBackendChoice>,
731) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
732    match try_embed_query_with_choice(models_dir, query, choice) {
733        Ok(t) => Ok(t),
734        Err(reason @ FallbackReason::OAuthQuota { backend }) => {
735            let alt = match backend {
736                "codex" => Some(crate::cli::LlmBackendChoice::Claude),
737                "claude" => Some(crate::cli::LlmBackendChoice::Codex),
738                "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
739                "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
740                _ => None,
741            };
742            if let Some(alt_choice) = alt {
743                try_embed_query_with_choice(models_dir, query, Some(alt_choice))
744            } else {
745                Err(reason)
746            }
747        }
748        Err(reason @ FallbackReason::SlotExhausted) => {
749            std::thread::sleep(std::time::Duration::from_millis(750));
750            try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
751        }
752        Err(other) => Err(other),
753    }
754}
755
756/// Classify an embedding [`AppError`] into a typed [`FallbackReason`].
757///
758/// v1.0.85 (ADR-0043): discriminates the 4 new causes (SlotExhausted,
759/// OAuthQuota, BackendMismatch, DimZero) from the legacy generic
760/// EmbeddingFailed bucket. The classification is purely lexical
761/// (substring match on the message) — no I/O, no retries, no
762/// telemetry, deterministic and `#[serial_test::serial(env)]`-safe.
763pub fn classify_embedding_error(err: AppError) -> FallbackReason {
764    match err {
765        AppError::Timeout {
766            operation,
767            duration_secs,
768        } => FallbackReason::Timeout {
769            operation,
770            duration_secs,
771        },
772        AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
773            // GAP-004 (v1.0.88): typed-discriminator dispatch.
774            // The lexical classifier picks the discriminator; the arms below
775            // enrich the result with the backend name and the
776            // requested/resolved pair that the JSON envelope needs.
777            //
778            // Note: `Cancelled` and `EmbeddingFailed(msg)` are not in the
779            // 6-variant enum (they have no lexical marker) so we keep them
780            // as explicit guards at the head of the match.
781            EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
782            EmbeddingErrorKind::OAuth => {
783                let backend = if msg.contains("codex") {
784                    "codex"
785                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
786                    // G45-CR5: anthropic-ratelimit-* headers are emitted only by
787                    // the Claude CLI subprocess; treat them as claude quota
788                    // signals even when the message text omits the word
789                    // "claude" explicitly.
790                    "claude"
791                } else if msg.contains("opencode") {
792                    "opencode"
793                } else {
794                    "unknown"
795                };
796                FallbackReason::OAuthQuota { backend }
797            }
798            EmbeddingErrorKind::Quota => {
799                let backend = if msg.contains("codex") {
800                    "codex"
801                } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
802                    "claude"
803                } else if msg.contains("opencode") {
804                    "opencode"
805                } else {
806                    "unknown"
807                };
808                FallbackReason::OAuthQuota { backend }
809            }
810            EmbeddingErrorKind::BackendMismatch => {
811                // The `msg.contains("claude")` arm is intentionally
812                // placed BEFORE the OAuth arm so that a backend-mismatch
813                // message that mentions both "claude" and "codex" maps to
814                // BackendMismatch (the more specific failure mode).
815                let (requested, resolved) =
816                    if msg.contains("requested claude") && msg.contains("but codex") {
817                        ("claude", "codex")
818                    } else if msg.contains("requested codex") && msg.contains("but claude") {
819                        ("codex", "claude")
820                    } else if msg.contains("requested claude") {
821                        ("claude", "unknown")
822                    } else if msg.contains("requested codex") {
823                        ("codex", "unknown")
824                    } else {
825                        ("unknown", "unknown")
826                    };
827                FallbackReason::BackendMismatch {
828                    requested,
829                    resolved,
830                }
831            }
832            EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
833            EmbeddingErrorKind::Unknown => {
834                if msg.contains("cancelled") {
835                    FallbackReason::Cancelled
836                } else {
837                    FallbackReason::EmbeddingFailed(msg)
838                }
839            }
840        },
841        e => FallbackReason::EmbeddingFailed(e.to_string()),
842    }
843}
844// backends before giving up. The chain order matches the user-supplied
845// `--llm-fallback` list (default: codex, claude, none).
846// =============================================================================
847
848/// Tries each LLM backend in `chain` in order, returning the first
849/// successful embedding. On failure, the diagnostic tail of the last
850/// error is preserved in the returned `AppError::Embedding` so the
851/// operator can see WHY every backend failed.
852///
853/// If `skip_on_failure` is `true` AND every backend fails, the function
854/// returns `Ok(Vec::new())` (an empty vector) to signal "persist
855/// without embedding" — the call site is then responsible for writing
856/// a `pending_embeddings` row that can be retried later by the
857/// `embedding retry` subcommand.
858///
859/// Defaults the chain to `[codex, claude, none]` when `chain` is
860/// empty, matching the v1.0.81 behaviour where codex was the
861/// implicit default and claude was the implicit fallback.
862pub fn embed_with_fallback(
863    models_dir: &Path,
864    text: &str,
865    chain: &[LlmBackendKind],
866    skip_on_failure: bool,
867) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
868    use crate::llm::exit_code_hints::LlmBackendError;
869    let effective: Vec<LlmBackendKind> = if chain.is_empty() {
870        vec![
871            LlmBackendKind::Codex,
872            LlmBackendKind::Claude,
873            LlmBackendKind::Opencode,
874            LlmBackendKind::None,
875        ]
876    } else {
877        chain.to_vec()
878    };
879
880    let mut last_err: Option<AppError> = None;
881    for backend in &effective {
882        // BUG-003 / v1.0.85: propagar o backend REAL retornado por
883        // embed_via_backend (que pode diferir do chain position quando
884        // LlmEmbedding::detect_available substitui codex por claude).
885        // O tuple `(_, requested_kind)` é descartado — só queremos o
886        // backend resolvido na primeira posição.
887        // ADR-0046 / BUG-11 v1.0.88: use `embed_via_backend_strict` so the
888        // sentinel `None` backend propagates the last real error instead
889        // of silently degrading to `Ok((Vec::new(), None))`. This is the
890        // path that caused preflight rejections to be swallowed by the
891        // chain's default trailing `None`.
892        match embed_via_backend_strict(
893            models_dir,
894            text,
895            backend,
896            last_err.as_ref(),
897            skip_on_failure,
898        ) {
899            Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
900            Err(e) => {
901                // ADR-0011: Validation errors (OAuth-only enforcement) are
902                // FATAL — propagate immediately without trying the next
903                // backend. This prevents the fallback chain from swallowing
904                // OAuth violations via the trailing `None` sentinel.
905                if matches!(e, AppError::Validation(_)) {
906                    return Err(e);
907                }
908                tracing::warn!(
909                    target: "embedding",
910                    backend = ?backend,
911                    error = %e,
912                    "embed_with_fallback: backend failed, trying next"
913                );
914                last_err = Some(e);
915            }
916        }
917    }
918    if skip_on_failure {
919        // Signal "persist with no embedding" via an empty vector paired
920        // with `None` so callers know the chain exhausted without a hit.
921        // Caller is responsible for writing a `pending_embeddings` row
922        // that can be retried later by the `embedding retry` subcommand.
923        return Ok((Vec::new(), LlmBackendKind::None));
924    }
925    Err(last_err
926        .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
927}
928
929/// LLM backend kind for the fallback chain. Mirrors the CLI
930/// `--llm-backend` enum so users can pass the same value to
931/// `--llm-fallback` without translation.
932#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
933pub enum LlmBackendKind {
934    /// `codex exec` (default for v1.0.76+).
935    Codex,
936    /// `claude -p` (fallback for ChatGPT Pro OAuth unavailability).
937    Claude,
938    /// `opencode run` (v1.0.90).
939    Opencode,
940    /// OpenRouter HTTP API (v1.0.93).
941    OpenRouter,
942    /// No embedding — empty vector returned.
943    None,
944}
945
946impl LlmBackendKind {
947    /// Stable string label used in tracing and JSON envelopes. The
948    /// string values are part of the public contract for `envelope.backend_invoked`.
949    pub fn as_str(self) -> &'static str {
950        match self {
951            Self::Codex => "codex",
952            Self::Claude => "claude",
953            Self::Opencode => "opencode",
954            Self::OpenRouter => "openrouter",
955            Self::None => "none",
956        }
957    }
958}
959
960/// Embeds a single text via the given backend. Used by
961/// `embed_with_fallback` and exposed to allow direct one-shot
962/// selection without a chain.
963/// Embeds a single text via the given backend. Used by
964/// `embed_with_fallback` and exposed to allow direct one-shot
965/// selection without a chain.
966///
967/// BUG-003 / v1.0.85: returns `(Vec<f32>, LlmBackendKind)`. The
968/// second element reports the backend that ACTUALLY executed the
969/// embedding, not the chain position requested by the caller. When
970/// `LlmBackendKind::Codex` is requested but `codex` is absent from
971/// PATH, `LlmEmbedding::detect_available` substitutes claude and the
972/// tuple carries `LlmBackendKind::Claude` so the operator sees the
973/// truth in `envelope.backend_invoked`.
974pub fn embed_via_backend(
975    models_dir: &Path,
976    text: &str,
977    backend: &LlmBackendKind,
978) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
979    match backend {
980        LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
981        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
982        LlmBackendKind::Claude => {
983            // ADR-0042 / GAP-002: route Claude through its own static
984            // embedder instead of re-using the Codex path (which used
985            // to silently pick Codex if PATH ordered it first).
986            tracing::debug!(
987                target: "embedder",
988                backend = "claude",
989                "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
990            );
991            embed_via_claude_local_resolved(models_dir, text, None, None)
992        }
993        LlmBackendKind::Opencode => {
994            tracing::debug!(
995                target: "embedder",
996                backend = "opencode",
997                "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
998            );
999            embed_via_opencode_local_resolved(models_dir, text, None, None)
1000        }
1001        LlmBackendKind::OpenRouter => {
1002            tracing::debug!(
1003                target: "embedder",
1004                backend = "openrouter",
1005                "embed_via_backend: using OpenRouter API (v1.0.93)"
1006            );
1007            let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1008                AppError::Embedding(
1009                    "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1010                )
1011            })?;
1012            let rt = shared_runtime()?;
1013            let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
1014            Ok((vec, LlmBackendKind::OpenRouter))
1015        }
1016    }
1017}
1018
1019// ADR-0046 / BUG-11 v1.0.88: specialisation of `embed_via_backend` that
1020// refuses to SILENTLY DEGRADE to `LlmBackendKind::None` after all real
1021// backends (Codex, Claude) have failed. The previous behaviour
1022// (`Ok((Vec::new(), None))`) caused the `remember` write path to persist
1023// memories with zero-dimensional embeddings — breaking `recall` and
1024// `hybrid-search` while returning exit 0 (BUG-11 CRITICAL).
1025//
1026// When `--llm-backend none` is explicitly requested (i.e. `last_err` is
1027// None AND the chain was a single-element `[None]`), pass
1028// `skip_on_failure = true` to `embed_with_fallback` to consume the empty
1029// vector via the pending-embeddings retry queue instead of persisting
1030// directly. This helper is the right hook for `remember`/`edit`/`ingest`.
1031pub fn embed_via_backend_strict(
1032    models_dir: &Path,
1033    text: &str,
1034    backend: &LlmBackendKind,
1035    last_err: Option<&AppError>,
1036    skip_on_failure: bool,
1037) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1038    use crate::llm::exit_code_hints::LlmBackendError;
1039    match backend {
1040        LlmBackendKind::None => {
1041            // If the caller opted into skip_on_failure AND no prior
1042            // backend has recorded an error, the empty vector is
1043            // intentional (chain of only [None]).
1044            if skip_on_failure && last_err.is_none() {
1045                Ok((Vec::new(), LlmBackendKind::None))
1046            } else if last_err.is_some() {
1047                // The chain reached `None` after Codex/Claude failed.
1048                // Propagate the most recent error so `remember` aborts
1049                // instead of persisting a memory without an embedding.
1050                Err(match last_err {
1051                    Some(e) => AppError::Embedding(format!("{e}")),
1052                    None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1053                })
1054            } else {
1055                // Empty chain with no skip_on_failure — treat as a
1056                // configuration error (no backends available).
1057                Err(AppError::Embedding(
1058                    LlmBackendError::NoBackendsAvailable.to_string(),
1059                ))
1060            }
1061        }
1062        LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1063        LlmBackendKind::Claude => {
1064            tracing::debug!(
1065                target: "embedder",
1066                backend = "claude",
1067                "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1068            );
1069            embed_via_claude_local_resolved(models_dir, text, None, None)
1070        }
1071        LlmBackendKind::Opencode => {
1072            tracing::debug!(
1073                target: "embedder",
1074                backend = "opencode",
1075                "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1076            );
1077            embed_via_opencode_local_resolved(models_dir, text, None, None)
1078        }
1079        LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1080    }
1081}
1082
1083/// Legacy one-shot wrapper around `embed_via_backend` that discards
1084/// the resolved backend. Kept for call sites that only care about
1085/// the vector and ignore the executed-backend signal. New code
1086/// should prefer `embed_via_backend` directly.
1087pub fn embed_via_backend_legacy(
1088    models_dir: &Path,
1089    text: &str,
1090    backend: &LlmBackendKind,
1091) -> Result<Vec<f32>, AppError> {
1092    embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1093}
1094
1095pub fn embed_passages_controlled_local(
1096    models_dir: &Path,
1097    texts: &[&str],
1098    token_counts: &[usize],
1099) -> Result<Vec<Vec<f32>>, AppError> {
1100    let embedder = get_embedder(models_dir)?;
1101    embed_passages_controlled(embedder, texts, token_counts)
1102}
1103
1104/// G42/S3: embeds `texts` through the bounded parallel fan-out and
1105/// returns vectors in input order.
1106pub fn embed_passages_parallel_local(
1107    models_dir: &Path,
1108    texts: &[String],
1109    parallelism: usize,
1110    batch_size: usize,
1111) -> Result<Vec<Vec<f32>>, AppError> {
1112    let embedder = get_embedder(models_dir)?;
1113    embed_texts_parallel(embedder, texts, parallelism, batch_size)
1114}
1115
1116/// v1.0.93 (GAP-OR-INGEST): embeds multiple passages with
1117/// `EmbeddingBackendChoice` awareness. When the resolved chain starts
1118/// with `OpenRouter` and the client is initialised, uses the HTTP batch
1119/// API (`embed_batch`) instead of subprocess fan-out — no LLM slot
1120/// consumed, ~200ms per batch vs ~15s per subprocess cold-start.
1121/// Falls back to `embed_passages_parallel_local` for LLM backends.
1122pub fn embed_passages_parallel_with_embedding_choice(
1123    models_dir: &Path,
1124    texts: &[String],
1125    parallelism: usize,
1126    batch_size: usize,
1127    embedding_backend: crate::cli::EmbeddingBackendChoice,
1128    llm_backend: crate::cli::LlmBackendChoice,
1129) -> Result<Vec<Vec<f32>>, AppError> {
1130    let chain = embedding_backend.to_chain(llm_backend);
1131    if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1132        let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1133            AppError::Embedding(
1134                "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1135            )
1136        })?;
1137        let rt = shared_runtime()?;
1138        let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1139        let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1140        Ok(vecs)
1141    } else {
1142        embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1143    }
1144}
1145
1146/// G56: in-process cache for entity embeddings keyed by `(model, text)`.
1147///
1148/// Schema v13 is immutable: `entity_embeddings` does not have a `text`
1149/// column, so a pure DB-side cache would require a schema bump. Instead
1150/// we keep a process-wide LRU-style map that survives within one CLI
1151/// invocation. The hit rate is high in `ingest` (re-embedding the same
1152/// canonical entity across thousands of memories) and modest in `remember`
1153/// (typical single-memory invocations).
1154///
1155/// Key: `blake3(model || "\0" || text)`. Value: `Arc<Vec<f32>>` so the
1156/// collector can drop the map entry while a `Vec` is still in flight.
1157type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1158
1159static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1160
1161fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1162    ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1163}
1164
1165fn entity_cache_key(model: &str, text: &str) -> u64 {
1166    let mut hasher = blake3::Hasher::new();
1167    hasher.update(model.as_bytes());
1168    hasher.update(b"\0");
1169    hasher.update(text.as_bytes());
1170    let h = hasher.finalize();
1171    let bytes = h.as_bytes();
1172    u64::from_le_bytes([
1173        bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1174    ])
1175}
1176
1177/// G56: embeds entity-name texts through a process-wide cache.
1178///
1179/// Skips any `(model, text)` pair already produced in this CLI invocation
1180/// and only spawns subprocesses for the cache misses. Returns vectors in
1181/// the same order as `texts`.
1182///
1183/// Designed for entity-name batches (short texts). For chunk embeds use
1184/// [`embed_passages_parallel_local`] directly — chunks are unique per
1185/// memory and cache hit rate is negligible.
1186pub fn embed_entity_texts_cached(
1187    models_dir: &Path,
1188    texts: &[String],
1189    parallelism: usize,
1190    embedding_backend: crate::cli::EmbeddingBackendChoice,
1191    llm_backend: crate::cli::LlmBackendChoice,
1192) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1193    if texts.is_empty() {
1194        return Ok((Vec::new(), EmbedCacheStats::default()));
1195    }
1196    // GAP-OR-ENTITY-EMBED: resolve the SAME chain the chunk path uses so the
1197    // entity embedding honours `--embedding-backend`/`--llm-backend` instead
1198    // of always forcing the codex subprocess (the old G56 code path).
1199    let chain = embedding_backend.to_chain(llm_backend);
1200
1201    // `none` short-circuit: when the resolved chain is exactly `[None]`
1202    // (`--embedding-backend llm --llm-backend none`) skip every backend and
1203    // return empty vectors WITHOUT spawning a subprocess. Empties are never
1204    // cached so a later call with a real backend in the same process is not
1205    // poisoned; they count as misses for stats parity with the chunk path.
1206    if chain.as_slice() == [LlmBackendKind::None] {
1207        let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1208        return Ok((
1209            out,
1210            EmbedCacheStats {
1211                requested: texts.len(),
1212                hits: 0,
1213                misses: texts.len(),
1214            },
1215        ));
1216    }
1217
1218    // Cache model label reflects the EFFECTIVE embedding backend. When the
1219    // chain actually routes through OpenRouter, vectors carry that model's
1220    // dim/MRL profile and must never collide with codex-produced vectors;
1221    // for the local path we keep the prior `model_label()` so the in-process
1222    // cache key is unchanged (no regression — this cache is process-local).
1223    let routed_openrouter =
1224        chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1225    let model = if routed_openrouter {
1226        format!("openrouter:{}", crate::constants::embedding_dim())
1227    } else {
1228        get_embedder(models_dir)?.lock().model_label()
1229    };
1230    let cache = entity_embed_cache();
1231    let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1232    let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1233    {
1234        let guard = cache.lock();
1235        for (i, text) in texts.iter().enumerate() {
1236            let key = entity_cache_key(&model, text);
1237            if let Some(v) = guard.get(&key) {
1238                hits[i] = Some(Arc::clone(v));
1239            } else {
1240                miss_indices.push(i);
1241            }
1242        }
1243    }
1244    let miss_count = miss_indices.len();
1245    if miss_count > 0 {
1246        let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1247        // GAP-OR-ENTITY-EMBED: route misses through the backend-aware batch
1248        // helper (same one the chunk path uses). With OpenRouter this hits the
1249        // REST `embed_batch` (~200ms) instead of the codex subprocess (~120s).
1250        let miss_vecs = embed_passages_parallel_with_embedding_choice(
1251            models_dir,
1252            &miss_texts,
1253            parallelism,
1254            entity_embed_batch_size(),
1255            embedding_backend,
1256            llm_backend,
1257        )?;
1258        let mut guard = cache.lock();
1259        for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1260            let vec = Arc::new(miss_vecs[slot].clone());
1261            let key = entity_cache_key(&model, &texts[orig_idx]);
1262            guard.insert(key, Arc::clone(&vec));
1263            hits[orig_idx] = Some(vec);
1264        }
1265    }
1266    let mut out = Vec::with_capacity(texts.len());
1267    for hit in hits.into_iter() {
1268        let v = hit.ok_or_else(|| {
1269            AppError::Embedding("entity embed cache produced null result".to_string())
1270        })?;
1271        out.push((*v).clone());
1272    }
1273    Ok((
1274        out,
1275        EmbedCacheStats {
1276            requested: texts.len(),
1277            hits: texts.len() - miss_count,
1278            misses: miss_count,
1279        },
1280    ))
1281}
1282
1283/// G56: stats snapshot returned by [`embed_entity_texts_cached`].
1284#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1285pub struct EmbedCacheStats {
1286    pub requested: usize,
1287    pub hits: usize,
1288    pub misses: usize,
1289}
1290
1291impl EmbedCacheStats {
1292    /// Hit rate as a fraction in `[0.0, 1.0]`. Returns 0.0 when nothing was requested.
1293    pub fn hit_rate(&self) -> f64 {
1294        if self.requested == 0 {
1295            0.0
1296        } else {
1297            self.hits as f64 / self.requested as f64
1298        }
1299    }
1300}
1301
1302/// G42/S3 core: bounded parallel batch embedding.
1303///
1304/// - texts are grouped into batches of `batch_size` (one LLM call per
1305///   batch, G42/S2);
1306/// - at most `effective_permits(parallelism)` LLM subprocesses run
1307///   simultaneously (`Arc<Semaphore>` + `acquire_owned`, BLOCO 2);
1308/// - results stream through a BOUNDED mpsc channel so the caller-side
1309///   collector applies backpressure and can persist incrementally
1310///   (BLOCO 5);
1311/// - the global `CancellationToken` aborts in-flight work on the first
1312///   signal; subprocesses die with their futures via `kill_on_drop`
1313///   (BLOCO 6).
1314pub fn embed_texts_parallel(
1315    embedder: &Mutex<LlmEmbedding>,
1316    texts: &[String],
1317    parallelism: usize,
1318    batch_size: usize,
1319) -> Result<Vec<Vec<f32>>, AppError> {
1320    let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1321    embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1322        slots[idx] = Some(v.to_vec());
1323        Ok(())
1324    })?;
1325    let mut out = Vec::with_capacity(slots.len());
1326    for (idx, slot) in slots.into_iter().enumerate() {
1327        out.push(slot.ok_or_else(|| {
1328            AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1329        })?);
1330    }
1331    Ok(out)
1332}
1333
1334/// Like [`embed_texts_parallel`] but invokes `on_result` as soon as each
1335/// embedding arrives (BLOCO 5: incremental persistence — a kill loses at
1336/// most the in-flight batches, never the already-delivered items).
1337pub fn embed_texts_parallel_with(
1338    embedder: &Mutex<LlmEmbedding>,
1339    texts: &[String],
1340    parallelism: usize,
1341    batch_size: usize,
1342    mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1343) -> Result<(), AppError> {
1344    if texts.is_empty() {
1345        return Ok(());
1346    }
1347    let dim = crate::constants::embedding_dim();
1348    if texts.len() == 1 {
1349        let v = embed_passage(embedder, &texts[0])?;
1350        return on_result(0, &v);
1351    }
1352
1353    let client = clone_client(embedder);
1354    let permits = effective_permits(parallelism);
1355    let batches = build_batches(texts, batch_size.max(1));
1356    let token = crate::cancel_token().clone();
1357
1358    let work = move |batch: Vec<(usize, String)>| {
1359        let client = client.clone();
1360        async move {
1361            client
1362                .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1363                .await
1364        }
1365    };
1366
1367    let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1368    match tokio::runtime::Handle::try_current() {
1369        Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1370        Err(_) => shared_runtime()?.block_on(fan_out),
1371    }
1372}
1373
1374/// Groups `(global_index, text)` pairs into batches of `batch_size`.
1375fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1376    texts
1377        .iter()
1378        .cloned()
1379        .enumerate()
1380        .collect::<Vec<_>>()
1381        .chunks(batch_size)
1382        .map(|c| c.to_vec())
1383        .collect()
1384}
1385
1386/// G42/S3 BLOCO 2: effective permit count.
1387///
1388/// `permits = clamp(requested, 1, 32) ∧ cpus ∧ ram_livre*0.5/RSS` — see
1389/// the module docs for the measured RSS rationale.
1390pub fn effective_permits(requested: usize) -> usize {
1391    let cpus = std::thread::available_parallelism()
1392        .map(|n| n.get())
1393        .unwrap_or(4);
1394    let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1395        / crate::constants::LLM_WORKER_RSS_MB)
1396        .max(1) as usize;
1397    requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1398}
1399
1400/// Bounded fan-out engine. Generic over the per-batch work so the
1401/// concurrency contract is testable without spawning real LLMs.
1402///
1403/// Cancel safety (BLOCO 6/10): every task races its work against
1404/// `token.cancelled()` inside `tokio::select!`; both branches are
1405/// cancel-safe (the work future owns its subprocess via `kill_on_drop`,
1406/// and `cancelled()` is pure). On collector-side errors the `JoinSet`
1407/// is shut down, which drops in-flight futures and kills their
1408/// subprocesses.
1409async fn run_bounded<F, Fut>(
1410    batches: Vec<Vec<(usize, String)>>,
1411    permits: usize,
1412    dim: usize,
1413    token: CancellationToken,
1414    work: F,
1415    on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1416) -> Result<(), AppError>
1417where
1418    F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1419    Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1420{
1421    let total_batches = batches.len();
1422    let semaphore = Arc::new(Semaphore::new(permits));
1423    // BLOCO 5: bounded channel — producers block when the collector is
1424    // behind (backpressure); PROIBIDO unbounded_channel between stages.
1425    let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1426    let mut set: JoinSet<()> = JoinSet::new();
1427
1428    for (batch_idx, batch) in batches.into_iter().enumerate() {
1429        let sem = Arc::clone(&semaphore);
1430        let token = token.clone();
1431        let tx = tx.clone();
1432        let work = work.clone();
1433        set.spawn(async move {
1434            let wait_start = std::time::Instant::now();
1435            // acquire_owned: RAII permit moved into the task; returned
1436            // on every exit path INCLUDING panic (BLOCO 2).
1437            let Ok(_permit) = sem.acquire_owned().await else {
1438                let _ = tx
1439                    .send(Err(AppError::Embedding("semaphore closed".to_string())))
1440                    .await;
1441                return;
1442            };
1443            let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1444            let work_start = std::time::Instant::now();
1445            // ADR-0034: when `SQLITE_GRAPHRAG_IGNORE_SHUTDOWN=1` is set the
1446            // cancellation arm is dropped and the batch runs to completion.
1447            // This unblocks audit/test invocations whose `SHUTDOWN` flag was
1448            // contaminated by an earlier signal handler in the same process
1449            // tree. Production code never sees this branch.
1450            let outcome = if crate::should_obey_shutdown() {
1451                tokio::select! {
1452                    res = work(batch) => res,
1453                    _ = token.cancelled() => Err(AppError::Embedding(
1454                        "embedding cancelled by shutdown signal".to_string(),
1455                    )),
1456                }
1457            } else {
1458                work(batch).await
1459            };
1460            // BLOCO 8: permit wait time logged SEPARATELY from work time.
1461            tracing::debug!(
1462                target: "embedding",
1463                batch_idx,
1464                permit_wait_ms,
1465                work_ms = work_start.elapsed().as_millis() as u64,
1466                ok = outcome.is_ok(),
1467                "embedding batch finished"
1468            );
1469            let _ = tx.send(outcome).await;
1470        });
1471    }
1472    drop(tx);
1473
1474    let mut completed = 0usize;
1475    let mut failed = 0usize;
1476    let mut cancelled = 0usize;
1477    let mut first_error: Option<AppError> = None;
1478
1479    while let Some(message) = rx.recv().await {
1480        match message {
1481            Ok(items) => {
1482                completed += 1;
1483                if first_error.is_none() {
1484                    for (idx, v) in items {
1485                        if v.len() != dim {
1486                            first_error = Some(AppError::Embedding(format!(
1487                                "LLM returned {} dims for item {idx}, expected {dim}; \
1488                                 refusing to truncate or pad silently (G42/C5)",
1489                                v.len()
1490                            )));
1491                            break;
1492                        }
1493                        if let Err(e) = on_result(idx, &v) {
1494                            first_error = Some(e);
1495                            break;
1496                        }
1497                    }
1498                    if first_error.is_some() {
1499                        // Abort remaining work: dropped futures kill
1500                        // their subprocesses via kill_on_drop (BLOCO 6).
1501                        set.shutdown().await;
1502                    }
1503                }
1504            }
1505            Err(e) => {
1506                if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1507                    cancelled += 1;
1508                } else {
1509                    failed += 1;
1510                }
1511                if first_error.is_none() {
1512                    first_error = Some(e);
1513                    set.shutdown().await;
1514                }
1515            }
1516        }
1517    }
1518
1519    // Drain the JoinSet: surface panics distinctly (panic handling —
1520    // JoinError::is_panic tratado em todo join_next, BLOCO 9).
1521    while let Some(join_result) = set.join_next().await {
1522        if let Err(join_err) = join_result {
1523            if join_err.is_panic() {
1524                failed += 1;
1525                if first_error.is_none() {
1526                    first_error = Some(AppError::Embedding(format!(
1527                        "embedding task panicked: {join_err}"
1528                    )));
1529                }
1530            } else {
1531                cancelled += 1;
1532            }
1533        }
1534    }
1535
1536    // v1.0.85 (ADR-0043 hygiene): the fan-out summary event moved
1537    // from `tracing::info!` to `tracing::debug!` and the
1538    // `available_permits` field was removed — the user prohibited
1539    // pool-state telemetry (slot_pool_stats / slot_wait_ms) and
1540    // decorative `tracing::info!` events. The remaining counters
1541    // (total_batches / completed / failed / cancelled) describe the
1542    // progress of the operation itself, not the slot pool, and
1543    // remain visible to operators running with `RUST_LOG=debug` or
1544    // `-vvv`.
1545    tracing::debug!(
1546        target: "embedding",
1547        total_batches,
1548        completed,
1549        failed,
1550        cancelled,
1551        "embedding fan-out finished"
1552    );
1553
1554    match first_error {
1555        Some(e) => Err(e),
1556        None => Ok(()),
1557    }
1558}
1559
1560pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1561    let mut out = Vec::with_capacity(v.len() * 4);
1562    for f in v {
1563        out.extend_from_slice(&f.to_le_bytes());
1564    }
1565    out
1566}
1567
1568pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1569    let mut out = Vec::with_capacity(bytes.len() / 4);
1570    for chunk in bytes.chunks_exact(4) {
1571        out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1572    }
1573    out
1574}
1575
1576/// Returns the dimensionality of the embedding space. Used to
1577/// validate LLM responses and to size the in-memory cache.
1578pub fn embedding_dim() -> usize {
1579    crate::constants::embedding_dim()
1580}
1581
1582/// G42/C5: a vector with a divergent dimensionality is an ERROR, never
1583/// silently truncated or zero-padded (the pre-v1.0.79 `normalise_dim`
1584/// masked malformed LLM responses).
1585fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1586    let dim = crate::constants::embedding_dim();
1587    if v.len() != dim {
1588        return Err(AppError::Embedding(format!(
1589            "embedding has {} dims, expected {dim}; \
1590             refusing to truncate or pad silently (G42/C5)",
1591            v.len()
1592        )));
1593    }
1594    Ok(v)
1595}
1596
1597#[cfg(test)]
1598mod tests {
1599    use super::*;
1600    use std::sync::atomic::{AtomicUsize, Ordering};
1601
1602    #[test]
1603    fn f32_to_bytes_roundtrip() {
1604        let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1605        let bytes = f32_to_bytes(&input);
1606        assert_eq!(bytes.len(), input.len() * 4);
1607        let out = bytes_to_f32(&bytes);
1608        assert_eq!(out, input);
1609    }
1610
1611    #[test]
1612    fn validate_dim_rejects_divergent_vectors() {
1613        // G42/C5 acceptance criterion: a divergent vector MUST fail —
1614        // never be silently normalised.
1615        let dim = crate::constants::embedding_dim();
1616        let long = vec![0.0; dim + 10];
1617        assert!(validate_dim(long).is_err(), "longer vector must error");
1618        let short = vec![0.0; dim.saturating_sub(1).max(1)];
1619        assert!(validate_dim(short).is_err(), "shorter vector must error");
1620        let exact = vec![0.0; dim];
1621        assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1622    }
1623
1624    #[test]
1625    fn embedding_dim_matches_constants_source() {
1626        assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1627    }
1628
1629    #[test]
1630    fn build_batches_preserves_global_indices() {
1631        let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1632        let batches = build_batches(&texts, 4);
1633        assert_eq!(batches.len(), 3);
1634        assert_eq!(batches[0].len(), 4);
1635        assert_eq!(batches[2].len(), 2);
1636        assert_eq!(batches[2][1].0, 9);
1637        assert_eq!(batches[2][1].1, "t9");
1638    }
1639
1640    #[test]
1641    fn effective_permits_clamps_to_bounds() {
1642        assert!(effective_permits(0) >= 1);
1643        assert!(effective_permits(1000) <= 32);
1644    }
1645
1646    fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1647        (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1648    }
1649
1650    fn dummy_vec(dim: usize) -> Vec<f32> {
1651        vec![0.0; dim]
1652    }
1653
1654    /// G42 acceptance criterion: with N permits the measured peak of
1655    /// concurrent workers NEVER exceeds N, even with 10x more batches.
1656    #[test]
1657    fn concurrency_peak_never_exceeds_permits() {
1658        let permits = 4usize;
1659        let batches = test_batches(permits * 10);
1660        let dim = crate::constants::embedding_dim();
1661        let current = Arc::new(AtomicUsize::new(0));
1662        let peak = Arc::new(AtomicUsize::new(0));
1663
1664        let current_c = Arc::clone(&current);
1665        let peak_c = Arc::clone(&peak);
1666        let work = move |batch: Vec<(usize, String)>| {
1667            let current = Arc::clone(&current_c);
1668            let peak = Arc::clone(&peak_c);
1669            async move {
1670                let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1671                peak.fetch_max(now, Ordering::SeqCst);
1672                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1673                current.fetch_sub(1, Ordering::SeqCst);
1674                Ok(batch
1675                    .into_iter()
1676                    .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1677                    .collect())
1678            }
1679        };
1680
1681        let mut delivered = 0usize;
1682        let rt = tokio::runtime::Builder::new_multi_thread()
1683            .worker_threads(4)
1684            .enable_all()
1685            .build()
1686            .expect("test runtime");
1687        rt.block_on(run_bounded(
1688            batches,
1689            permits,
1690            dim,
1691            CancellationToken::new(),
1692            work,
1693            &mut |_idx, _v| {
1694                delivered += 1;
1695                Ok(())
1696            },
1697        ))
1698        .expect("fan-out must succeed");
1699
1700        assert_eq!(delivered, permits * 10, "every item must be delivered");
1701        assert!(
1702            peak.load(Ordering::SeqCst) <= permits,
1703            "peak concurrency {} exceeded permits {permits}",
1704            peak.load(Ordering::SeqCst)
1705        );
1706    }
1707
1708    /// G42 acceptance criterion: a panicking task returns its permit via
1709    /// RAII and surfaces as JoinError::is_panic, not a hang.
1710    #[test]
1711    fn panicking_task_returns_permit_and_surfaces_error() {
1712        let permits = 2usize;
1713        let batches = test_batches(4);
1714        let dim = crate::constants::embedding_dim();
1715
1716        let work = move |batch: Vec<(usize, String)>| async move {
1717            if batch[0].0 == 1 {
1718                panic!("intentional test panic");
1719            }
1720            Ok(batch
1721                .into_iter()
1722                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1723                .collect())
1724        };
1725
1726        let rt = tokio::runtime::Builder::new_multi_thread()
1727            .worker_threads(2)
1728            .enable_all()
1729            .build()
1730            .expect("test runtime");
1731        let result = rt.block_on(run_bounded(
1732            batches,
1733            permits,
1734            dim,
1735            CancellationToken::new(),
1736            work,
1737            &mut |_idx, _v| Ok(()),
1738        ));
1739
1740        let err = result.expect_err("panic must surface as an error");
1741        assert!(
1742            err.to_string().contains("panicked"),
1743            "error must mention the panic: {err}"
1744        );
1745    }
1746
1747    /// G42 acceptance criterion: cancellation aborts in-flight work and
1748    /// the fan-out terminates within the shutdown timeout.
1749    #[test]
1750    fn cancellation_terminates_fan_out_quickly() {
1751        let permits = 2usize;
1752        let batches = test_batches(8);
1753        let dim = crate::constants::embedding_dim();
1754        let token = CancellationToken::new();
1755
1756        let work = move |batch: Vec<(usize, String)>| async move {
1757            // Long enough that only cancellation can finish the test fast.
1758            tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1759            Ok(batch
1760                .into_iter()
1761                .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1762                .collect())
1763        };
1764
1765        let rt = tokio::runtime::Builder::new_multi_thread()
1766            .worker_threads(2)
1767            .enable_all()
1768            .build()
1769            .expect("test runtime");
1770        let cancel = token.clone();
1771        let start = std::time::Instant::now();
1772        let result = rt.block_on(async move {
1773            tokio::spawn(async move {
1774                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1775                cancel.cancel();
1776            });
1777            run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1778        });
1779
1780        assert!(result.is_err(), "cancelled fan-out must report an error");
1781        assert!(
1782            start.elapsed() < std::time::Duration::from_secs(10),
1783            "graceful shutdown must finish well under the work duration"
1784        );
1785    }
1786
1787    /// G42 acceptance criterion: a divergent dim coming out of the work
1788    /// stage fails the fan-out instead of being silently accepted.
1789    #[test]
1790    fn fan_out_rejects_divergent_dim() {
1791        let permits = 2usize;
1792        let batches = test_batches(2);
1793        let dim = crate::constants::embedding_dim();
1794
1795        let work = move |batch: Vec<(usize, String)>| async move {
1796            Ok(batch
1797                .into_iter()
1798                .map(|(i, _)| (i, vec![0.0f32; 3]))
1799                .collect::<Vec<(usize, Vec<f32>)>>())
1800        };
1801
1802        let rt = tokio::runtime::Builder::new_multi_thread()
1803            .worker_threads(2)
1804            .enable_all()
1805            .build()
1806            .expect("test runtime");
1807        let result = rt.block_on(run_bounded(
1808            batches,
1809            permits,
1810            dim,
1811            CancellationToken::new(),
1812            work,
1813            &mut |_idx, _v| Ok(()),
1814        ));
1815
1816        let err = result.expect_err("divergent dim must fail the fan-out");
1817        assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1818    }
1819
1820    /// G44: the calibration bases stay intact at the calibration dim.
1821    #[test]
1822    fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1823        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1824        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1825    }
1826
1827    /// G44: legacy 384-dim databases shrink to reliable batch sizes.
1828    #[test]
1829    fn adaptive_batch_dim384_shrinks() {
1830        assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1831        assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1832    }
1833
1834    /// G44: intermediate dims scale proportionally to the float budget.
1835    #[test]
1836    fn adaptive_batch_intermediate_dims() {
1837        assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1838        assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1839    }
1840
1841    /// G44: dims below the calibration dim never exceed the base.
1842    #[test]
1843    fn adaptive_batch_small_dim_clamps_to_base() {
1844        assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1845    }
1846
1847    /// G44: the function is total — no division by zero, no clamp panic.
1848    #[test]
1849    fn adaptive_batch_total_function() {
1850        assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1851        assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1852        assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1853    }
1854
1855    /// G44 end-to-end: the public wrappers follow the env-dim override.
1856    #[test]
1857    #[serial_test::serial(env)]
1858    fn adaptive_wrappers_follow_env_dim() {
1859        std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1860        let chunk = chunk_embed_batch_size();
1861        let entity = entity_embed_batch_size();
1862        std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1863        crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1864        assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1865        assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1866    }
1867
1868    // ---------------------------------------------------------------
1869    // G58/S1: FallbackReason + try_embed_query_with_fallback tests
1870    // ---------------------------------------------------------------
1871
1872    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps an OAuth
1873    /// error message to the OAuth variant regardless of case or
1874    /// surrounding text.
1875    #[test]
1876    fn embedding_error_kind_classify_oauth_message() {
1877        assert_eq!(
1878            EmbeddingErrorKind::classify("OAuth token expired for claude"),
1879            EmbeddingErrorKind::OAuth,
1880        );
1881        assert_eq!(
1882            EmbeddingErrorKind::classify("oauth authentication failed"),
1883            EmbeddingErrorKind::OAuth,
1884        );
1885    }
1886
1887    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a quota
1888    /// message to the Quota variant (without "OAuth" substring).
1889    #[test]
1890    fn embedding_error_kind_classify_quota_message() {
1891        assert_eq!(
1892            EmbeddingErrorKind::classify("quota exhausted on backend"),
1893            EmbeddingErrorKind::Quota,
1894        );
1895        assert_eq!(
1896            EmbeddingErrorKind::classify("Usage quota limit reached"),
1897            EmbeddingErrorKind::Quota,
1898        );
1899    }
1900
1901    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a slot-sema
1902    /// message to the SlotExhausted variant (matched BEFORE Quota so
1903    /// the more specific LLM-never-tried path wins).
1904    #[test]
1905    fn embedding_error_kind_classify_slot_exhausted_message() {
1906        assert_eq!(
1907            EmbeddingErrorKind::classify(
1908                "slot exhausted: failed to acquire LLM slot after backoff"
1909            ),
1910            EmbeddingErrorKind::SlotExhausted,
1911        );
1912    }
1913
1914    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify maps a
1915    /// zero-dimensional vector error to the ZeroDimension variant.
1916    #[test]
1917    fn embedding_error_kind_classify_zero_dimension_message() {
1918        assert_eq!(
1919            EmbeddingErrorKind::classify("embedding returned dim=zero"),
1920            EmbeddingErrorKind::ZeroDimension,
1921        );
1922        assert_eq!(
1923            EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1924            EmbeddingErrorKind::ZeroDimension,
1925        );
1926    }
1927
1928    /// GAP-004 (v1.0.88): EmbeddingErrorKind::classify falls back to
1929    /// the Unknown variant when no marker matches, and the code()
1930    /// accessor returns the kebab-safe discriminator string.
1931    #[test]
1932    fn embedding_error_kind_classify_unknown_fallback() {
1933        assert_eq!(
1934            EmbeddingErrorKind::classify("unrelated subprocess error"),
1935            EmbeddingErrorKind::Unknown,
1936        );
1937        assert_eq!(
1938            EmbeddingErrorKind::classify("rate limit hit"),
1939            EmbeddingErrorKind::Unknown,
1940        );
1941        // code() returns the stable discriminator string.
1942        assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1943        assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1944        assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1945        assert_eq!(
1946            EmbeddingErrorKind::BackendMismatch.code(),
1947            "backend-mismatch"
1948        );
1949        assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1950        assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1951    }
1952
1953    /// Display impl covers all three variants without panicking.
1954    #[test]
1955    fn fallback_reason_display_does_not_panic() {
1956        let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1957        let _ = FallbackReason::Cancelled.to_string();
1958        let _ = FallbackReason::Timeout {
1959            operation: "embed_query".into(),
1960            duration_secs: 30,
1961        }
1962        .to_string();
1963    }
1964
1965    /// FallbackReason is PartialEq — used in test assertions to verify
1966    /// the mapping rules.
1967    #[test]
1968    fn fallback_reason_is_partial_eq() {
1969        assert_eq!(
1970            FallbackReason::EmbeddingFailed("a".into()),
1971            FallbackReason::EmbeddingFailed("a".into())
1972        );
1973        assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1974        assert_ne!(
1975            FallbackReason::EmbeddingFailed("a".into()),
1976            FallbackReason::EmbeddingFailed("b".into())
1977        );
1978        assert_ne!(
1979            FallbackReason::Cancelled,
1980            FallbackReason::Timeout {
1981                operation: "x".into(),
1982                duration_secs: 1
1983            }
1984        );
1985    }
1986
1987    /// Timeout variant preserves the operation name and duration from the
1988    /// original AppError::Timeout for observability.
1989    #[test]
1990    fn fallback_reason_timeout_preserves_fields() {
1991        let r = FallbackReason::Timeout {
1992            operation: "embed_query_local".into(),
1993            duration_secs: 300,
1994        };
1995        match r {
1996            FallbackReason::Timeout {
1997                operation,
1998                duration_secs,
1999            } => {
2000                assert_eq!(operation, "embed_query_local");
2001                assert_eq!(duration_secs, 300);
2002            }
2003            other => panic!("expected Timeout, got {other:?}"),
2004        }
2005    }
2006
2007    /// try_embed_query_with_fallback surfaces an EmbeddingFailed variant
2008    /// when the LLM subprocess errors. Uses a path that surely does not
2009    /// contain any embedder configuration (the binary is invoked as
2010    /// `codex` / `claude` via PATH which, in tests, defaults to nothing
2011    /// in scope, so `LlmEmbedding::detect_available()` returns Err).
2012    #[test]
2013    #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
2014    fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
2015        // Pointing at a models dir that does not exist forces the embedder
2016        // init to fail; the error is mapped to EmbeddingFailed.
2017        let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
2018        let result = try_embed_query_with_fallback(bogus, "hello world");
2019        match result {
2020            Err(FallbackReason::EmbeddingFailed(msg)) => {
2021                // The original error must survive in the message for ops triage.
2022                assert!(!msg.is_empty(), "fallback message must not be empty");
2023            }
2024            Err(FallbackReason::Cancelled) => {
2025                panic!("expected EmbeddingFailed, got Cancelled");
2026            }
2027            Err(FallbackReason::Timeout { .. }) => {
2028                panic!("expected EmbeddingFailed, got Timeout");
2029            }
2030            Err(FallbackReason::SlotExhausted) => {
2031                panic!("expected EmbeddingFailed, got SlotExhausted");
2032            }
2033            Err(FallbackReason::OAuthQuota { .. }) => {
2034                panic!("expected EmbeddingFailed, got OAuthQuota");
2035            }
2036            Err(FallbackReason::BackendMismatch { .. }) => {
2037                panic!("expected EmbeddingFailed, got BackendMismatch");
2038            }
2039            Err(FallbackReason::DimZero) => {
2040                panic!("expected EmbeddingFailed, got DimZero");
2041            }
2042            Ok(_) => {
2043                panic!("expected an error, got Ok — embedder must fail for bogus path");
2044            }
2045        }
2046    }
2047
2048    // G56: entity embed cache — unit tests
2049    #[test]
2050    fn g56_entity_cache_key_is_stable_and_distinct() {
2051        let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2052        let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2053        let k3 = entity_cache_key("codex:default", "claude-code");
2054        let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2055        assert_eq!(k1, k2, "same model+text must hash identically");
2056        assert_ne!(k1, k3, "different text must hash differently");
2057        assert_ne!(k1, k4, "different model must hash differently");
2058    }
2059
2060    #[test]
2061    fn g56_entity_embed_cache_stats_hit_rate() {
2062        let zero = EmbedCacheStats::default();
2063        assert_eq!(zero.hit_rate(), 0.0);
2064        let half = EmbedCacheStats {
2065            requested: 4,
2066            hits: 2,
2067            misses: 2,
2068        };
2069        assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2070        let all = EmbedCacheStats {
2071            requested: 7,
2072            hits: 7,
2073            misses: 0,
2074        };
2075        assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2076    }
2077
2078    #[test]
2079    fn g56_entity_embed_cache_populates_and_hits() {
2080        // Manually populate the cache: bypasses the LLM by writing a
2081        // known vector under a chosen (model, text) key, then verifies
2082        // the cache is consulted before any LLM call would happen.
2083        let cache = entity_embed_cache();
2084        let model = "test-model";
2085        let text = "sqlite-graphrag";
2086        let key = entity_cache_key(model, text);
2087        let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2088        cache.lock().insert(key, Arc::clone(&stored));
2089        let guard = cache.lock();
2090        let hit = guard.get(&key).expect("cache must return stored value");
2091        assert_eq!(hit.len(), crate::constants::embedding_dim());
2092        assert!((hit[0] - 0.42).abs() < 1e-6);
2093    }
2094
2095    #[test]
2096    fn g56_empty_texts_short_circuits_with_zero_stats() {
2097        // Cannot call embed_entity_texts_cached without an LLM on PATH,
2098        // so we only verify the empty-input contract via the stats struct.
2099        let stats = EmbedCacheStats::default();
2100        assert_eq!(stats.requested, 0);
2101        assert_eq!(stats.hits, 0);
2102        assert_eq!(stats.misses, 0);
2103        assert_eq!(stats.hit_rate(), 0.0);
2104    }
2105}
2106
2107// =============================================================================
2108// v1.0.82 (GAP-005) — embed_with_fallback tests
2109// =============================================================================
2110#[cfg(test)]
2111mod embed_with_fallback_tests {
2112    use super::*;
2113    use crate::llm::exit_code_hints::LlmBackendError;
2114
2115    #[test]
2116    fn none_backend_returns_empty_vector_without_calling_llm() {
2117        // The `None` backend short-circuits to `Ok(vec![])` without
2118        // touching the LLM at all. This is the signal the caller uses
2119        // to insert a `pending_embeddings` row.
2120        let (v, kind) = embed_via_backend(
2121            std::path::Path::new("/nonexistent"),
2122            "any text",
2123            &LlmBackendKind::None,
2124        )
2125        .expect("None backend never fails");
2126        assert!(v.is_empty());
2127        assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2128    }
2129
2130    #[test]
2131    fn empty_chain_defaults_to_codex_claude_none() {
2132        // Internal invariant: the default chain order is the v1.0.81
2133        // implicit order (codex first, then claude, then None as
2134        // graceful-degradation fallback).
2135        let defaults = [
2136            LlmBackendKind::Codex,
2137            LlmBackendKind::Claude,
2138            LlmBackendKind::None,
2139        ];
2140
2141        // ---------------------------------------------------------------
2142        // ADR-0042: as_str + reason_code unit tests
2143        // ---------------------------------------------------------------
2144
2145        #[allow(dead_code)]
2146        fn llm_backend_kind_as_str_is_stable() {
2147            assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2148            assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2149            assert_eq!(LlmBackendKind::None.as_str(), "none");
2150        }
2151
2152        #[allow(dead_code)]
2153        fn fallback_reason_reason_code_is_stable() {
2154            assert_eq!(
2155                FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2156                "embedding_failed"
2157            );
2158            assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2159            assert_eq!(
2160                FallbackReason::Timeout {
2161                    operation: "embed_query".into(),
2162                    duration_secs: 30
2163                }
2164                .reason_code(),
2165                "timeout"
2166            );
2167        }
2168        assert_eq!(defaults.len(), 3);
2169    }
2170
2171    #[test]
2172    fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2173        // ADR-0046 / BUG-11 v1.0.88: a fallback chain of only `[None]`
2174        // without `skip_on_failure=true` MUST abort with
2175        // `AppError::Embedding("no LLM backends available; fallback chain exhausted")`.
2176        //
2177        // Before BUG-11, the `None` tail returned `Ok((vec![], None))`
2178        // silently, which let `remember` persist a memory with a
2179        // zero-dimensional embedding (invisible to recall). The fix
2180        // routes the chain exhaustion through `embed_via_backend_strict`
2181        // so the caller can distinguish between "chain intentionally
2182        // degrades to skip" (skip_on_failure=true) and "chain has no
2183        // viable backend at all" (this test).
2184        let chain = vec![LlmBackendKind::None];
2185        let err = embed_with_fallback(
2186            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2187            "hello",
2188            &chain,
2189            false,
2190        )
2191        .expect_err("chain of only [None] without skip_on_failure MUST abort");
2192        let msg = format!("{err}");
2193        assert!(
2194            msg.contains("no LLM backends available"),
2195            "error must mention exhausted chain, got: {msg}"
2196        );
2197    }
2198    #[test]
2199    fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2200        // skip_on_failure=true + a chain of only `None` returns Ok(vec![])
2201        // because the None short-circuit always succeeds. This is the
2202        // canonical contract: skip_on_failure is a no-op when None is
2203        // the tail because None already provides graceful degradation.
2204        let chain = vec![LlmBackendKind::None];
2205        let v = embed_with_fallback(
2206            std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2207            "hello",
2208            &chain,
2209            true,
2210        )
2211        .expect("None chain is always Ok");
2212        assert!(v.0.is_empty(), "vector must be empty");
2213        assert_eq!(v.1, LlmBackendKind::None);
2214    }
2215    #[allow(dead_code)]
2216    fn llm_backend_error_no_backends_default_message() {
2217        // The fallback chain exhaustion error must mention
2218        // in its hint so the operator knows the remediation.
2219        let e = LlmBackendError::NoBackendsAvailable;
2220        let h = e.hint();
2221        assert!(h.contains("--llm-fallback"));
2222    }
2223
2224    #[test]
2225    fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2226        let e = LlmBackendError::NonZeroExit {
2227            exit_code: Some(137),
2228            signal: Some(9),
2229            stdout_tail: "out".into(),
2230            stderr_tail: "OOM killed".into(),
2231            binary: "codex".into(),
2232            hint: "OOM".into(),
2233        };
2234        let s = e.to_string();
2235        assert!(s.contains("codex"));
2236        assert!(s.contains("OOM killed"));
2237        assert!(s.contains("signal 9") || s.contains("exit 137"));
2238    }
2239}