1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
61
62static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
68
69pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
73
74pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
78
79pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
81
82fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
90 let base = base.max(1);
91 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
92}
93
94pub fn chunk_embed_batch_size() -> usize {
96 let dim = crate::constants::embedding_dim();
97 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
98 tracing::debug!(
99 dim,
100 base = CHUNK_EMBED_BATCH_SIZE,
101 batch,
102 "adaptive chunk batch size (G44)"
103 );
104 batch
105}
106
107pub fn entity_embed_batch_size() -> usize {
109 let dim = crate::constants::embedding_dim();
110 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
111 tracing::debug!(
112 dim,
113 base = ENTITY_EMBED_BATCH_SIZE,
114 batch,
115 "adaptive entity batch size (G44)"
116 );
117 batch
118}
119
120pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
122 if let Some(rt) = RUNTIME.get() {
123 return Ok(rt);
124 }
125 let rt = tokio::runtime::Builder::new_multi_thread()
126 .worker_threads(2)
127 .enable_all()
128 .build()
129 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
130 let _ = RUNTIME.set(rt);
131 Ok(RUNTIME.get().expect("RUNTIME initialised above"))
132}
133
134pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
136 if let Some(e) = EMBEDDER.get() {
137 return Ok(e);
138 }
139 let backend = LlmEmbedding::detect_available()?;
140 let _ = EMBEDDER.set(Mutex::new(backend));
141 Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
142}
143
144pub fn get_claude_embedder(
149 claude_binary: Option<&Path>,
150 claude_model: Option<&str>,
151) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
152 if let Some(e) = CLAUDE_EMBEDDER.get() {
153 return Ok(e);
154 }
155 let mut builder = LlmEmbedding::with_claude_builder();
156 if let Some(b) = claude_binary {
157 builder = builder.override_binary(b.to_path_buf());
158 }
159 if let Some(m) = claude_model {
160 builder = builder.override_model(m.to_string());
161 }
162 let backend = builder.build()?;
163 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
164 Ok(CLAUDE_EMBEDDER
165 .get()
166 .expect("CLAUDE_EMBEDDER initialised above"))
167}
168
169pub fn get_opencode_embedder(
174 opencode_binary: Option<&Path>,
175 opencode_model: Option<&str>,
176) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
177 if let Some(e) = OPENCODE_EMBEDDER.get() {
178 return Ok(e);
179 }
180 let mut builder = LlmEmbedding::with_opencode_builder();
181 if let Some(b) = opencode_binary {
182 builder = builder.override_binary(b.to_path_buf());
183 }
184 if let Some(m) = opencode_model {
185 builder = builder.override_model(m.to_string());
186 }
187 let backend = builder.build()?;
188 let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
189 Ok(OPENCODE_EMBEDDER
190 .get()
191 .expect("OPENCODE_EMBEDDER initialised above"))
192}
193
194pub fn embed_via_claude_local(
198 _models_dir: &Path,
199 text: &str,
200 claude_binary: Option<&Path>,
201 claude_model: Option<&str>,
202) -> Result<Vec<f32>, AppError> {
203 let _slot_guard = acquire_llm_slot_for_embedding()?;
204 let embedder = get_claude_embedder(claude_binary, claude_model)?;
205 embed_passage(embedder, text)
206}
207
208pub fn embed_via_claude_local_resolved(
213 _models_dir: &Path,
214 text: &str,
215 claude_binary: Option<&Path>,
216 claude_model: Option<&str>,
217) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
218 let _slot_guard = acquire_llm_slot_for_embedding()?;
219 let embedder = get_claude_embedder(claude_binary, claude_model)?;
220 let v = embed_passage(embedder, text)?;
221 Ok((v, LlmBackendKind::Claude))
222}
223
224pub fn embed_via_opencode_local_resolved(
229 _models_dir: &Path,
230 text: &str,
231 opencode_binary: Option<&Path>,
232 opencode_model: Option<&str>,
233) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
234 let _slot_guard = acquire_llm_slot_for_embedding()?;
235 let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
236 let v = embed_passage(embedder, text)?;
237 Ok((v, LlmBackendKind::Opencode))
238}
239fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
242 embedder.lock().clone()
243}
244
245pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
249 let client = clone_client(embedder);
250 let result = client.embed_passage(text)?;
251 validate_dim(result)
252}
253
254pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
258 let client = clone_client(embedder);
259 let result = client.embed_query(text)?;
260 validate_dim(result)
261}
262
263pub fn embed_passages_controlled(
268 embedder: &Mutex<LlmEmbedding>,
269 texts: &[&str],
270 _token_counts: &[usize],
271) -> Result<Vec<Vec<f32>>, AppError> {
272 if texts.is_empty() {
273 return Ok(Vec::new());
274 }
275 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
276 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
277}
278
279pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
280 let _slot_guard = acquire_llm_slot_for_embedding()?;
281 let embedder = get_embedder(models_dir)?;
282 embed_passage(embedder, text)
283}
284
285pub fn should_skip_embedding_on_failure() -> bool {
289 matches!(
290 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
291 Ok("1") | Ok("true")
292 )
293}
294
295pub fn embed_passage_or_skip(
302 models_dir: &Path,
303 text: &str,
304 choice: Option<crate::cli::LlmBackendChoice>,
305) -> Result<Option<Vec<f32>>, AppError> {
306 match embed_passage_with_choice(models_dir, text, choice) {
307 Ok((v, _backend)) => Ok(Some(v)),
308 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
309 Err(e) => {
310 if should_skip_embedding_on_failure() {
311 tracing::warn!(
312 error = %e,
313 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
314 );
315 Ok(None)
316 } else {
317 Err(e)
318 }
319 }
320 }
321}
322
323pub fn embed_passage_local_resolved(
329 models_dir: &Path,
330 text: &str,
331) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
332 let _slot_guard = acquire_llm_slot_for_embedding()?;
333 let embedder = get_embedder(models_dir)?;
334 let v = embed_passage(embedder, text)?;
335 let kind = match embedder.lock().flavour() {
336 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
337 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
338 crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
339 };
340 Ok((v, kind))
341}
342
343pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
344 let _slot_guard = acquire_llm_slot_for_embedding()?;
345 let embedder = get_embedder(models_dir)?;
346 embed_query(embedder, text)
347}
348
349pub fn embed_passage_with_choice(
366 models_dir: &Path,
367 text: &str,
368 choice: Option<crate::cli::LlmBackendChoice>,
369) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
370 let _slot_guard = acquire_llm_slot_for_embedding()?;
371 match choice {
372 None => {
373 let embedder = get_embedder(models_dir)?;
374 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
375 }
376 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
377 }
378}
379pub fn try_embed_query_with_choice(
385 models_dir: &Path,
386 text: &str,
387 choice: Option<crate::cli::LlmBackendChoice>,
388) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
389 match embed_passage_with_choice(models_dir, text, choice) {
390 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
403 Ok((v, backend)) => Ok((v, backend)),
404 Err(e) => Err(classify_embedding_error(e)),
405 }
406}
407fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
419 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
420 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
421 .ok()
422 .and_then(|s| s.parse::<u32>().ok())
423 .filter(|n| *n >= 1)
424 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
425 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
426 0
427 } else {
428 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
429 .ok()
430 .and_then(|s| s.parse::<u64>().ok())
431 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
432 };
433 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
441 Ok(guard) => Ok(guard),
442 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
443 "slot exhausted: {e} (fall back to FTS5)"
444 ))),
445 Err(e) => Err(e),
446 }
447}
448#[derive(Debug, Clone, Copy, PartialEq, Eq)]
460pub enum EmbeddingErrorKind {
461 OAuth,
463 Quota,
465 SlotExhausted,
467 BackendMismatch,
469 ZeroDimension,
471 Unknown,
473}
474
475impl EmbeddingErrorKind {
476 pub fn classify(msg: &str) -> Self {
485 let m = msg.to_lowercase();
486 if m.contains("oauth") {
487 Self::OAuth
488 } else if m.contains("quota") {
489 Self::Quota
490 } else if m.contains("slot exhausted") {
491 Self::SlotExhausted
492 } else if m.contains("backend mismatch") {
493 Self::BackendMismatch
494 } else if m.contains("dim") && m.contains("zero") {
495 Self::ZeroDimension
496 } else {
497 Self::Unknown
498 }
499 }
500
501 pub fn code(&self) -> &'static str {
503 match self {
504 Self::OAuth => "oauth",
505 Self::Quota => "quota",
506 Self::SlotExhausted => "slot-exhausted",
507 Self::BackendMismatch => "backend-mismatch",
508 Self::ZeroDimension => "zero-dimension",
509 Self::Unknown => "unknown",
510 }
511 }
512}
513
514impl std::fmt::Display for EmbeddingErrorKind {
515 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
516 f.write_str(self.code())
517 }
518}
519
520#[derive(Debug, Clone, PartialEq)]
527pub enum FallbackReason {
528 EmbeddingFailed(String),
532 SlotExhausted,
537 OAuthQuota { backend: &'static str },
541 BackendMismatch {
545 requested: &'static str,
546 resolved: &'static str,
547 },
548 DimZero,
553 Cancelled,
555 Timeout {
558 operation: String,
559 duration_secs: u64,
560 },
561}
562
563impl FallbackReason {
564 pub fn reason_code(&self) -> &'static str {
568 match self {
569 Self::EmbeddingFailed(_) => "embedding_failed",
570 Self::SlotExhausted => "slot_exhausted",
571 Self::OAuthQuota { .. } => "oauth_quota",
572 Self::BackendMismatch { .. } => "backend_mismatch",
573 Self::DimZero => "dim_zero",
574 Self::Cancelled => "cancelled",
575 Self::Timeout { .. } => "timeout",
576 }
577 }
578}
579
580impl std::fmt::Display for FallbackReason {
581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582 match self {
583 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
584 Self::SlotExhausted => write!(
585 f,
586 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
587 ),
588 Self::OAuthQuota { backend } => {
589 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
590 }
591 Self::BackendMismatch {
592 requested,
593 resolved,
594 } => {
595 write!(
596 f,
597 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
598 )
599 }
600 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
601 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
602 Self::Timeout {
603 operation,
604 duration_secs,
605 } => {
606 write!(
607 f,
608 "embedding timed out after {duration_secs}s during {operation}"
609 )
610 }
611 }
612 }
613}
614
615impl std::error::Error for FallbackReason {}
616
617pub fn try_embed_query_with_fallback(
625 models_dir: &Path,
626 query: &str,
627) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
628 match embed_query_local(models_dir, query) {
629 Ok(v) => Ok((v, LlmBackendKind::None)),
630 Err(e) => Err(classify_embedding_error(e)),
631 }
632}
633
634pub fn try_embed_query_with_deterministic_fallback(
643 models_dir: &Path,
644 query: &str,
645 choice: Option<crate::cli::LlmBackendChoice>,
646) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
647 match try_embed_query_with_choice(models_dir, query, choice) {
648 Ok(t) => Ok(t),
649 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
650 let alt = match backend {
651 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
652 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
653 "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
654 _ => None,
655 };
656 if let Some(alt_choice) = alt {
657 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
658 } else {
659 Err(reason)
660 }
661 }
662 Err(reason @ FallbackReason::SlotExhausted) => {
663 std::thread::sleep(std::time::Duration::from_millis(750));
664 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
665 }
666 Err(other) => Err(other),
667 }
668}
669
670pub fn classify_embedding_error(err: AppError) -> FallbackReason {
678 match err {
679 AppError::Timeout {
680 operation,
681 duration_secs,
682 } => FallbackReason::Timeout {
683 operation,
684 duration_secs,
685 },
686 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
687 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
696 EmbeddingErrorKind::OAuth => {
697 let backend = if msg.contains("codex") {
698 "codex"
699 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
700 "claude"
705 } else if msg.contains("opencode") {
706 "opencode"
707 } else {
708 "unknown"
709 };
710 FallbackReason::OAuthQuota { backend }
711 }
712 EmbeddingErrorKind::Quota => {
713 let backend = if msg.contains("codex") {
714 "codex"
715 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
716 "claude"
717 } else if msg.contains("opencode") {
718 "opencode"
719 } else {
720 "unknown"
721 };
722 FallbackReason::OAuthQuota { backend }
723 }
724 EmbeddingErrorKind::BackendMismatch => {
725 let (requested, resolved) =
730 if msg.contains("requested claude") && msg.contains("but codex") {
731 ("claude", "codex")
732 } else if msg.contains("requested codex") && msg.contains("but claude") {
733 ("codex", "claude")
734 } else if msg.contains("requested claude") {
735 ("claude", "unknown")
736 } else if msg.contains("requested codex") {
737 ("codex", "unknown")
738 } else {
739 ("unknown", "unknown")
740 };
741 FallbackReason::BackendMismatch {
742 requested,
743 resolved,
744 }
745 }
746 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
747 EmbeddingErrorKind::Unknown => {
748 if msg.contains("cancelled") {
749 FallbackReason::Cancelled
750 } else {
751 FallbackReason::EmbeddingFailed(msg)
752 }
753 }
754 },
755 e => FallbackReason::EmbeddingFailed(e.to_string()),
756 }
757}
758pub fn embed_with_fallback(
777 models_dir: &Path,
778 text: &str,
779 chain: &[LlmBackendKind],
780 skip_on_failure: bool,
781) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
782 use crate::llm::exit_code_hints::LlmBackendError;
783 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
784 vec![
785 LlmBackendKind::Codex,
786 LlmBackendKind::Claude,
787 LlmBackendKind::Opencode,
788 LlmBackendKind::None,
789 ]
790 } else {
791 chain.to_vec()
792 };
793
794 let mut last_err: Option<AppError> = None;
795 for backend in &effective {
796 match embed_via_backend_strict(
807 models_dir,
808 text,
809 backend,
810 last_err.as_ref(),
811 skip_on_failure,
812 ) {
813 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
814 Err(e) => {
815 tracing::warn!(
816 target: "embedding",
817 backend = ?backend,
818 error = %e,
819 "embed_with_fallback: backend failed, trying next"
820 );
821 last_err = Some(e);
822 }
823 }
824 }
825 if skip_on_failure {
826 return Ok((Vec::new(), LlmBackendKind::None));
831 }
832 Err(last_err
833 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
834}
835
836#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
840pub enum LlmBackendKind {
841 Codex,
843 Claude,
845 Opencode,
847 None,
849}
850
851impl LlmBackendKind {
852 pub fn as_str(self) -> &'static str {
855 match self {
856 Self::Codex => "codex",
857 Self::Claude => "claude",
858 Self::Opencode => "opencode",
859 Self::None => "none",
860 }
861 }
862}
863
864pub fn embed_via_backend(
879 models_dir: &Path,
880 text: &str,
881 backend: &LlmBackendKind,
882) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
883 match backend {
884 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
885 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
886 LlmBackendKind::Claude => {
887 tracing::debug!(
891 target: "embedder",
892 backend = "claude",
893 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
894 );
895 embed_via_claude_local_resolved(models_dir, text, None, None)
896 }
897 LlmBackendKind::Opencode => {
898 tracing::debug!(
899 target: "embedder",
900 backend = "opencode",
901 "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
902 );
903 embed_via_opencode_local_resolved(models_dir, text, None, None)
904 }
905 }
906}
907
908pub fn embed_via_backend_strict(
921 models_dir: &Path,
922 text: &str,
923 backend: &LlmBackendKind,
924 last_err: Option<&AppError>,
925 skip_on_failure: bool,
926) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
927 use crate::llm::exit_code_hints::LlmBackendError;
928 match backend {
929 LlmBackendKind::None => {
930 if skip_on_failure && last_err.is_none() {
934 Ok((Vec::new(), LlmBackendKind::None))
935 } else if last_err.is_some() {
936 Err(match last_err {
940 Some(e) => AppError::Embedding(format!("{e}")),
941 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
942 })
943 } else {
944 Err(AppError::Embedding(
947 LlmBackendError::NoBackendsAvailable.to_string(),
948 ))
949 }
950 }
951 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
952 LlmBackendKind::Claude => {
953 tracing::debug!(
954 target: "embedder",
955 backend = "claude",
956 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
957 );
958 embed_via_claude_local_resolved(models_dir, text, None, None)
959 }
960 LlmBackendKind::Opencode => {
961 tracing::debug!(
962 target: "embedder",
963 backend = "opencode",
964 "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
965 );
966 embed_via_opencode_local_resolved(models_dir, text, None, None)
967 }
968 }
969}
970
971pub fn embed_via_backend_legacy(
976 models_dir: &Path,
977 text: &str,
978 backend: &LlmBackendKind,
979) -> Result<Vec<f32>, AppError> {
980 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
981}
982
983pub fn embed_passages_controlled_local(
984 models_dir: &Path,
985 texts: &[&str],
986 token_counts: &[usize],
987) -> Result<Vec<Vec<f32>>, AppError> {
988 let embedder = get_embedder(models_dir)?;
989 embed_passages_controlled(embedder, texts, token_counts)
990}
991
992pub fn embed_passages_parallel_local(
995 models_dir: &Path,
996 texts: &[String],
997 parallelism: usize,
998 batch_size: usize,
999) -> Result<Vec<Vec<f32>>, AppError> {
1000 let embedder = get_embedder(models_dir)?;
1001 embed_texts_parallel(embedder, texts, parallelism, batch_size)
1002}
1003
1004type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1016
1017static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1018
1019fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1020 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1021}
1022
1023fn entity_cache_key(model: &str, text: &str) -> u64 {
1024 let mut hasher = blake3::Hasher::new();
1025 hasher.update(model.as_bytes());
1026 hasher.update(b"\0");
1027 hasher.update(text.as_bytes());
1028 let h = hasher.finalize();
1029 let bytes = h.as_bytes();
1030 u64::from_le_bytes([
1031 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1032 ])
1033}
1034
1035pub fn embed_entity_texts_cached(
1045 models_dir: &Path,
1046 texts: &[String],
1047 parallelism: usize,
1048) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1049 if texts.is_empty() {
1050 return Ok((Vec::new(), EmbedCacheStats::default()));
1051 }
1052 let embedder = get_embedder(models_dir)?;
1053 let model = embedder.lock().model_label();
1054 let cache = entity_embed_cache();
1055 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1056 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1057 {
1058 let guard = cache.lock();
1059 for (i, text) in texts.iter().enumerate() {
1060 let key = entity_cache_key(&model, text);
1061 if let Some(v) = guard.get(&key) {
1062 hits[i] = Some(Arc::clone(v));
1063 } else {
1064 miss_indices.push(i);
1065 }
1066 }
1067 }
1068 let miss_count = miss_indices.len();
1069 if miss_count > 0 {
1070 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1071 let miss_vecs = embed_texts_parallel(
1072 embedder,
1073 &miss_texts,
1074 parallelism,
1075 entity_embed_batch_size(),
1076 )?;
1077 let mut guard = cache.lock();
1078 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1079 let vec = Arc::new(miss_vecs[slot].clone());
1080 let key = entity_cache_key(&model, &texts[orig_idx]);
1081 guard.insert(key, Arc::clone(&vec));
1082 hits[orig_idx] = Some(vec);
1083 }
1084 }
1085 let mut out = Vec::with_capacity(texts.len());
1086 for hit in hits.into_iter() {
1087 let v = hit.ok_or_else(|| {
1088 AppError::Embedding("entity embed cache produced null result".to_string())
1089 })?;
1090 out.push((*v).clone());
1091 }
1092 Ok((
1093 out,
1094 EmbedCacheStats {
1095 requested: texts.len(),
1096 hits: texts.len() - miss_count,
1097 misses: miss_count,
1098 },
1099 ))
1100}
1101
1102#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1104pub struct EmbedCacheStats {
1105 pub requested: usize,
1106 pub hits: usize,
1107 pub misses: usize,
1108}
1109
1110impl EmbedCacheStats {
1111 pub fn hit_rate(&self) -> f64 {
1113 if self.requested == 0 {
1114 0.0
1115 } else {
1116 self.hits as f64 / self.requested as f64
1117 }
1118 }
1119}
1120
1121pub fn embed_texts_parallel(
1134 embedder: &Mutex<LlmEmbedding>,
1135 texts: &[String],
1136 parallelism: usize,
1137 batch_size: usize,
1138) -> Result<Vec<Vec<f32>>, AppError> {
1139 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1140 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1141 slots[idx] = Some(v.to_vec());
1142 Ok(())
1143 })?;
1144 let mut out = Vec::with_capacity(slots.len());
1145 for (idx, slot) in slots.into_iter().enumerate() {
1146 out.push(slot.ok_or_else(|| {
1147 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1148 })?);
1149 }
1150 Ok(out)
1151}
1152
1153pub fn embed_texts_parallel_with(
1157 embedder: &Mutex<LlmEmbedding>,
1158 texts: &[String],
1159 parallelism: usize,
1160 batch_size: usize,
1161 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1162) -> Result<(), AppError> {
1163 if texts.is_empty() {
1164 return Ok(());
1165 }
1166 let dim = crate::constants::embedding_dim();
1167 if texts.len() == 1 {
1168 let v = embed_passage(embedder, &texts[0])?;
1169 return on_result(0, &v);
1170 }
1171
1172 let client = clone_client(embedder);
1173 let permits = effective_permits(parallelism);
1174 let batches = build_batches(texts, batch_size.max(1));
1175 let token = crate::cancel_token().clone();
1176
1177 let work = move |batch: Vec<(usize, String)>| {
1178 let client = client.clone();
1179 async move {
1180 client
1181 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1182 .await
1183 }
1184 };
1185
1186 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1187 match tokio::runtime::Handle::try_current() {
1188 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1189 Err(_) => shared_runtime()?.block_on(fan_out),
1190 }
1191}
1192
1193fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1195 texts
1196 .iter()
1197 .cloned()
1198 .enumerate()
1199 .collect::<Vec<_>>()
1200 .chunks(batch_size)
1201 .map(|c| c.to_vec())
1202 .collect()
1203}
1204
1205pub fn effective_permits(requested: usize) -> usize {
1210 let cpus = std::thread::available_parallelism()
1211 .map(|n| n.get())
1212 .unwrap_or(4);
1213 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1214 / crate::constants::LLM_WORKER_RSS_MB)
1215 .max(1) as usize;
1216 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1217}
1218
1219async fn run_bounded<F, Fut>(
1229 batches: Vec<Vec<(usize, String)>>,
1230 permits: usize,
1231 dim: usize,
1232 token: CancellationToken,
1233 work: F,
1234 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1235) -> Result<(), AppError>
1236where
1237 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1238 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1239{
1240 let total_batches = batches.len();
1241 let semaphore = Arc::new(Semaphore::new(permits));
1242 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1245 let mut set: JoinSet<()> = JoinSet::new();
1246
1247 for (batch_idx, batch) in batches.into_iter().enumerate() {
1248 let sem = Arc::clone(&semaphore);
1249 let token = token.clone();
1250 let tx = tx.clone();
1251 let work = work.clone();
1252 set.spawn(async move {
1253 let wait_start = std::time::Instant::now();
1254 let Ok(_permit) = sem.acquire_owned().await else {
1257 let _ = tx
1258 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1259 .await;
1260 return;
1261 };
1262 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1263 let work_start = std::time::Instant::now();
1264 let outcome = if crate::should_obey_shutdown() {
1270 tokio::select! {
1271 res = work(batch) => res,
1272 _ = token.cancelled() => Err(AppError::Embedding(
1273 "embedding cancelled by shutdown signal".to_string(),
1274 )),
1275 }
1276 } else {
1277 work(batch).await
1278 };
1279 tracing::debug!(
1281 target: "embedding",
1282 batch_idx,
1283 permit_wait_ms,
1284 work_ms = work_start.elapsed().as_millis() as u64,
1285 ok = outcome.is_ok(),
1286 "embedding batch finished"
1287 );
1288 let _ = tx.send(outcome).await;
1289 });
1290 }
1291 drop(tx);
1292
1293 let mut completed = 0usize;
1294 let mut failed = 0usize;
1295 let mut cancelled = 0usize;
1296 let mut first_error: Option<AppError> = None;
1297
1298 while let Some(message) = rx.recv().await {
1299 match message {
1300 Ok(items) => {
1301 completed += 1;
1302 if first_error.is_none() {
1303 for (idx, v) in items {
1304 if v.len() != dim {
1305 first_error = Some(AppError::Embedding(format!(
1306 "LLM returned {} dims for item {idx}, expected {dim}; \
1307 refusing to truncate or pad silently (G42/C5)",
1308 v.len()
1309 )));
1310 break;
1311 }
1312 if let Err(e) = on_result(idx, &v) {
1313 first_error = Some(e);
1314 break;
1315 }
1316 }
1317 if first_error.is_some() {
1318 set.shutdown().await;
1321 }
1322 }
1323 }
1324 Err(e) => {
1325 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1326 cancelled += 1;
1327 } else {
1328 failed += 1;
1329 }
1330 if first_error.is_none() {
1331 first_error = Some(e);
1332 set.shutdown().await;
1333 }
1334 }
1335 }
1336 }
1337
1338 while let Some(join_result) = set.join_next().await {
1341 if let Err(join_err) = join_result {
1342 if join_err.is_panic() {
1343 failed += 1;
1344 if first_error.is_none() {
1345 first_error = Some(AppError::Embedding(format!(
1346 "embedding task panicked: {join_err}"
1347 )));
1348 }
1349 } else {
1350 cancelled += 1;
1351 }
1352 }
1353 }
1354
1355 tracing::debug!(
1365 target: "embedding",
1366 total_batches,
1367 completed,
1368 failed,
1369 cancelled,
1370 "embedding fan-out finished"
1371 );
1372
1373 match first_error {
1374 Some(e) => Err(e),
1375 None => Ok(()),
1376 }
1377}
1378
1379pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1380 let mut out = Vec::with_capacity(v.len() * 4);
1381 for f in v {
1382 out.extend_from_slice(&f.to_le_bytes());
1383 }
1384 out
1385}
1386
1387pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1388 let mut out = Vec::with_capacity(bytes.len() / 4);
1389 for chunk in bytes.chunks_exact(4) {
1390 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1391 }
1392 out
1393}
1394
1395pub fn embedding_dim() -> usize {
1398 crate::constants::embedding_dim()
1399}
1400
1401fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1405 let dim = crate::constants::embedding_dim();
1406 if v.len() != dim {
1407 return Err(AppError::Embedding(format!(
1408 "embedding has {} dims, expected {dim}; \
1409 refusing to truncate or pad silently (G42/C5)",
1410 v.len()
1411 )));
1412 }
1413 Ok(v)
1414}
1415
1416#[cfg(test)]
1417mod tests {
1418 use super::*;
1419 use std::sync::atomic::{AtomicUsize, Ordering};
1420
1421 #[test]
1422 fn f32_to_bytes_roundtrip() {
1423 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1424 let bytes = f32_to_bytes(&input);
1425 assert_eq!(bytes.len(), input.len() * 4);
1426 let out = bytes_to_f32(&bytes);
1427 assert_eq!(out, input);
1428 }
1429
1430 #[test]
1431 fn validate_dim_rejects_divergent_vectors() {
1432 let dim = crate::constants::embedding_dim();
1435 let long = vec![0.0; dim + 10];
1436 assert!(validate_dim(long).is_err(), "longer vector must error");
1437 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1438 assert!(validate_dim(short).is_err(), "shorter vector must error");
1439 let exact = vec![0.0; dim];
1440 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1441 }
1442
1443 #[test]
1444 fn embedding_dim_matches_constants_source() {
1445 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1446 }
1447
1448 #[test]
1449 fn build_batches_preserves_global_indices() {
1450 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1451 let batches = build_batches(&texts, 4);
1452 assert_eq!(batches.len(), 3);
1453 assert_eq!(batches[0].len(), 4);
1454 assert_eq!(batches[2].len(), 2);
1455 assert_eq!(batches[2][1].0, 9);
1456 assert_eq!(batches[2][1].1, "t9");
1457 }
1458
1459 #[test]
1460 fn effective_permits_clamps_to_bounds() {
1461 assert!(effective_permits(0) >= 1);
1462 assert!(effective_permits(1000) <= 32);
1463 }
1464
1465 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1466 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1467 }
1468
1469 fn dummy_vec(dim: usize) -> Vec<f32> {
1470 vec![0.0; dim]
1471 }
1472
1473 #[test]
1476 fn concurrency_peak_never_exceeds_permits() {
1477 let permits = 4usize;
1478 let batches = test_batches(permits * 10);
1479 let dim = crate::constants::embedding_dim();
1480 let current = Arc::new(AtomicUsize::new(0));
1481 let peak = Arc::new(AtomicUsize::new(0));
1482
1483 let current_c = Arc::clone(¤t);
1484 let peak_c = Arc::clone(&peak);
1485 let work = move |batch: Vec<(usize, String)>| {
1486 let current = Arc::clone(¤t_c);
1487 let peak = Arc::clone(&peak_c);
1488 async move {
1489 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1490 peak.fetch_max(now, Ordering::SeqCst);
1491 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1492 current.fetch_sub(1, Ordering::SeqCst);
1493 Ok(batch
1494 .into_iter()
1495 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1496 .collect())
1497 }
1498 };
1499
1500 let mut delivered = 0usize;
1501 let rt = tokio::runtime::Builder::new_multi_thread()
1502 .worker_threads(4)
1503 .enable_all()
1504 .build()
1505 .expect("test runtime");
1506 rt.block_on(run_bounded(
1507 batches,
1508 permits,
1509 dim,
1510 CancellationToken::new(),
1511 work,
1512 &mut |_idx, _v| {
1513 delivered += 1;
1514 Ok(())
1515 },
1516 ))
1517 .expect("fan-out must succeed");
1518
1519 assert_eq!(delivered, permits * 10, "every item must be delivered");
1520 assert!(
1521 peak.load(Ordering::SeqCst) <= permits,
1522 "peak concurrency {} exceeded permits {permits}",
1523 peak.load(Ordering::SeqCst)
1524 );
1525 }
1526
1527 #[test]
1530 fn panicking_task_returns_permit_and_surfaces_error() {
1531 let permits = 2usize;
1532 let batches = test_batches(4);
1533 let dim = crate::constants::embedding_dim();
1534
1535 let work = move |batch: Vec<(usize, String)>| async move {
1536 if batch[0].0 == 1 {
1537 panic!("intentional test panic");
1538 }
1539 Ok(batch
1540 .into_iter()
1541 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1542 .collect())
1543 };
1544
1545 let rt = tokio::runtime::Builder::new_multi_thread()
1546 .worker_threads(2)
1547 .enable_all()
1548 .build()
1549 .expect("test runtime");
1550 let result = rt.block_on(run_bounded(
1551 batches,
1552 permits,
1553 dim,
1554 CancellationToken::new(),
1555 work,
1556 &mut |_idx, _v| Ok(()),
1557 ));
1558
1559 let err = result.expect_err("panic must surface as an error");
1560 assert!(
1561 err.to_string().contains("panicked"),
1562 "error must mention the panic: {err}"
1563 );
1564 }
1565
1566 #[test]
1569 fn cancellation_terminates_fan_out_quickly() {
1570 let permits = 2usize;
1571 let batches = test_batches(8);
1572 let dim = crate::constants::embedding_dim();
1573 let token = CancellationToken::new();
1574
1575 let work = move |batch: Vec<(usize, String)>| async move {
1576 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1578 Ok(batch
1579 .into_iter()
1580 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1581 .collect())
1582 };
1583
1584 let rt = tokio::runtime::Builder::new_multi_thread()
1585 .worker_threads(2)
1586 .enable_all()
1587 .build()
1588 .expect("test runtime");
1589 let cancel = token.clone();
1590 let start = std::time::Instant::now();
1591 let result = rt.block_on(async move {
1592 tokio::spawn(async move {
1593 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1594 cancel.cancel();
1595 });
1596 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1597 });
1598
1599 assert!(result.is_err(), "cancelled fan-out must report an error");
1600 assert!(
1601 start.elapsed() < std::time::Duration::from_secs(10),
1602 "graceful shutdown must finish well under the work duration"
1603 );
1604 }
1605
1606 #[test]
1609 fn fan_out_rejects_divergent_dim() {
1610 let permits = 2usize;
1611 let batches = test_batches(2);
1612 let dim = crate::constants::embedding_dim();
1613
1614 let work = move |batch: Vec<(usize, String)>| async move {
1615 Ok(batch
1616 .into_iter()
1617 .map(|(i, _)| (i, vec![0.0f32; 3]))
1618 .collect::<Vec<(usize, Vec<f32>)>>())
1619 };
1620
1621 let rt = tokio::runtime::Builder::new_multi_thread()
1622 .worker_threads(2)
1623 .enable_all()
1624 .build()
1625 .expect("test runtime");
1626 let result = rt.block_on(run_bounded(
1627 batches,
1628 permits,
1629 dim,
1630 CancellationToken::new(),
1631 work,
1632 &mut |_idx, _v| Ok(()),
1633 ));
1634
1635 let err = result.expect_err("divergent dim must fail the fan-out");
1636 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1637 }
1638
1639 #[test]
1641 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1642 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1643 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1644 }
1645
1646 #[test]
1648 fn adaptive_batch_dim384_shrinks() {
1649 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1650 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1651 }
1652
1653 #[test]
1655 fn adaptive_batch_intermediate_dims() {
1656 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1657 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1658 }
1659
1660 #[test]
1662 fn adaptive_batch_small_dim_clamps_to_base() {
1663 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1664 }
1665
1666 #[test]
1668 fn adaptive_batch_total_function() {
1669 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1670 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1671 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1672 }
1673
1674 #[test]
1676 #[serial_test::serial(env)]
1677 fn adaptive_wrappers_follow_env_dim() {
1678 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1679 let chunk = chunk_embed_batch_size();
1680 let entity = entity_embed_batch_size();
1681 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1682 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1683 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1684 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1685 }
1686
1687 #[test]
1695 fn embedding_error_kind_classify_oauth_message() {
1696 assert_eq!(
1697 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1698 EmbeddingErrorKind::OAuth,
1699 );
1700 assert_eq!(
1701 EmbeddingErrorKind::classify("oauth authentication failed"),
1702 EmbeddingErrorKind::OAuth,
1703 );
1704 }
1705
1706 #[test]
1709 fn embedding_error_kind_classify_quota_message() {
1710 assert_eq!(
1711 EmbeddingErrorKind::classify("quota exhausted on backend"),
1712 EmbeddingErrorKind::Quota,
1713 );
1714 assert_eq!(
1715 EmbeddingErrorKind::classify("Usage quota limit reached"),
1716 EmbeddingErrorKind::Quota,
1717 );
1718 }
1719
1720 #[test]
1724 fn embedding_error_kind_classify_slot_exhausted_message() {
1725 assert_eq!(
1726 EmbeddingErrorKind::classify(
1727 "slot exhausted: failed to acquire LLM slot after backoff"
1728 ),
1729 EmbeddingErrorKind::SlotExhausted,
1730 );
1731 }
1732
1733 #[test]
1736 fn embedding_error_kind_classify_zero_dimension_message() {
1737 assert_eq!(
1738 EmbeddingErrorKind::classify("embedding returned dim=zero"),
1739 EmbeddingErrorKind::ZeroDimension,
1740 );
1741 assert_eq!(
1742 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1743 EmbeddingErrorKind::ZeroDimension,
1744 );
1745 }
1746
1747 #[test]
1751 fn embedding_error_kind_classify_unknown_fallback() {
1752 assert_eq!(
1753 EmbeddingErrorKind::classify("unrelated subprocess error"),
1754 EmbeddingErrorKind::Unknown,
1755 );
1756 assert_eq!(
1757 EmbeddingErrorKind::classify("rate limit hit"),
1758 EmbeddingErrorKind::Unknown,
1759 );
1760 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1762 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1763 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1764 assert_eq!(
1765 EmbeddingErrorKind::BackendMismatch.code(),
1766 "backend-mismatch"
1767 );
1768 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1769 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1770 }
1771
1772 #[test]
1774 fn fallback_reason_display_does_not_panic() {
1775 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1776 let _ = FallbackReason::Cancelled.to_string();
1777 let _ = FallbackReason::Timeout {
1778 operation: "embed_query".into(),
1779 duration_secs: 30,
1780 }
1781 .to_string();
1782 }
1783
1784 #[test]
1787 fn fallback_reason_is_partial_eq() {
1788 assert_eq!(
1789 FallbackReason::EmbeddingFailed("a".into()),
1790 FallbackReason::EmbeddingFailed("a".into())
1791 );
1792 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1793 assert_ne!(
1794 FallbackReason::EmbeddingFailed("a".into()),
1795 FallbackReason::EmbeddingFailed("b".into())
1796 );
1797 assert_ne!(
1798 FallbackReason::Cancelled,
1799 FallbackReason::Timeout {
1800 operation: "x".into(),
1801 duration_secs: 1
1802 }
1803 );
1804 }
1805
1806 #[test]
1809 fn fallback_reason_timeout_preserves_fields() {
1810 let r = FallbackReason::Timeout {
1811 operation: "embed_query_local".into(),
1812 duration_secs: 300,
1813 };
1814 match r {
1815 FallbackReason::Timeout {
1816 operation,
1817 duration_secs,
1818 } => {
1819 assert_eq!(operation, "embed_query_local");
1820 assert_eq!(duration_secs, 300);
1821 }
1822 other => panic!("expected Timeout, got {other:?}"),
1823 }
1824 }
1825
1826 #[test]
1832 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1833 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1834 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1837 let result = try_embed_query_with_fallback(bogus, "hello world");
1838 match result {
1839 Err(FallbackReason::EmbeddingFailed(msg)) => {
1840 assert!(!msg.is_empty(), "fallback message must not be empty");
1842 }
1843 Err(FallbackReason::Cancelled) => {
1844 panic!("expected EmbeddingFailed, got Cancelled");
1845 }
1846 Err(FallbackReason::Timeout { .. }) => {
1847 panic!("expected EmbeddingFailed, got Timeout");
1848 }
1849 Err(FallbackReason::SlotExhausted) => {
1850 panic!("expected EmbeddingFailed, got SlotExhausted");
1851 }
1852 Err(FallbackReason::OAuthQuota { .. }) => {
1853 panic!("expected EmbeddingFailed, got OAuthQuota");
1854 }
1855 Err(FallbackReason::BackendMismatch { .. }) => {
1856 panic!("expected EmbeddingFailed, got BackendMismatch");
1857 }
1858 Err(FallbackReason::DimZero) => {
1859 panic!("expected EmbeddingFailed, got DimZero");
1860 }
1861 Ok(_) => {
1862 panic!("expected an error, got Ok — embedder must fail for bogus path");
1863 }
1864 }
1865 }
1866
1867 #[test]
1869 fn g56_entity_cache_key_is_stable_and_distinct() {
1870 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1871 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1872 let k3 = entity_cache_key("codex:default", "claude-code");
1873 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1874 assert_eq!(k1, k2, "same model+text must hash identically");
1875 assert_ne!(k1, k3, "different text must hash differently");
1876 assert_ne!(k1, k4, "different model must hash differently");
1877 }
1878
1879 #[test]
1880 fn g56_entity_embed_cache_stats_hit_rate() {
1881 let zero = EmbedCacheStats::default();
1882 assert_eq!(zero.hit_rate(), 0.0);
1883 let half = EmbedCacheStats {
1884 requested: 4,
1885 hits: 2,
1886 misses: 2,
1887 };
1888 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1889 let all = EmbedCacheStats {
1890 requested: 7,
1891 hits: 7,
1892 misses: 0,
1893 };
1894 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1895 }
1896
1897 #[test]
1898 fn g56_entity_embed_cache_populates_and_hits() {
1899 let cache = entity_embed_cache();
1903 let model = "test-model";
1904 let text = "sqlite-graphrag";
1905 let key = entity_cache_key(model, text);
1906 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1907 cache.lock().insert(key, Arc::clone(&stored));
1908 let guard = cache.lock();
1909 let hit = guard.get(&key).expect("cache must return stored value");
1910 assert_eq!(hit.len(), crate::constants::embedding_dim());
1911 assert!((hit[0] - 0.42).abs() < 1e-6);
1912 }
1913
1914 #[test]
1915 fn g56_empty_texts_short_circuits_with_zero_stats() {
1916 let stats = EmbedCacheStats::default();
1919 assert_eq!(stats.requested, 0);
1920 assert_eq!(stats.hits, 0);
1921 assert_eq!(stats.misses, 0);
1922 assert_eq!(stats.hit_rate(), 0.0);
1923 }
1924}
1925
1926#[cfg(test)]
1930mod embed_with_fallback_tests {
1931 use super::*;
1932 use crate::llm::exit_code_hints::LlmBackendError;
1933
1934 #[test]
1935 fn none_backend_returns_empty_vector_without_calling_llm() {
1936 let (v, kind) = embed_via_backend(
1940 std::path::Path::new("/nonexistent"),
1941 "any text",
1942 &LlmBackendKind::None,
1943 )
1944 .expect("None backend never fails");
1945 assert!(v.is_empty());
1946 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1947 }
1948
1949 #[test]
1950 fn empty_chain_defaults_to_codex_claude_none() {
1951 let defaults = [
1955 LlmBackendKind::Codex,
1956 LlmBackendKind::Claude,
1957 LlmBackendKind::None,
1958 ];
1959
1960 #[allow(dead_code)]
1965 fn llm_backend_kind_as_str_is_stable() {
1966 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1967 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1968 assert_eq!(LlmBackendKind::None.as_str(), "none");
1969 }
1970
1971 #[allow(dead_code)]
1972 fn fallback_reason_reason_code_is_stable() {
1973 assert_eq!(
1974 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1975 "embedding_failed"
1976 );
1977 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1978 assert_eq!(
1979 FallbackReason::Timeout {
1980 operation: "embed_query".into(),
1981 duration_secs: 30
1982 }
1983 .reason_code(),
1984 "timeout"
1985 );
1986 }
1987 assert_eq!(defaults.len(), 3);
1988 }
1989
1990 #[test]
1991 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
1992 let chain = vec![LlmBackendKind::None];
2004 let err = embed_with_fallback(
2005 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2006 "hello",
2007 &chain,
2008 false,
2009 )
2010 .expect_err("chain of only [None] without skip_on_failure MUST abort");
2011 let msg = format!("{err}");
2012 assert!(
2013 msg.contains("no LLM backends available"),
2014 "error must mention exhausted chain, got: {msg}"
2015 );
2016 }
2017 #[test]
2018 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2019 let chain = vec![LlmBackendKind::None];
2024 let v = embed_with_fallback(
2025 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2026 "hello",
2027 &chain,
2028 true,
2029 )
2030 .expect("None chain is always Ok");
2031 assert!(v.0.is_empty(), "vector must be empty");
2032 assert_eq!(v.1, LlmBackendKind::None);
2033 }
2034 #[allow(dead_code)]
2035 fn llm_backend_error_no_backends_default_message() {
2036 let e = LlmBackendError::NoBackendsAvailable;
2039 let h = e.hint();
2040 assert!(h.contains("--llm-fallback"));
2041 }
2042
2043 #[test]
2044 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2045 let e = LlmBackendError::NonZeroExit {
2046 exit_code: Some(137),
2047 signal: Some(9),
2048 stdout_tail: "out".into(),
2049 stderr_tail: "OOM killed".into(),
2050 binary: "codex".into(),
2051 hint: "OOM".into(),
2052 };
2053 let s = e.to_string();
2054 assert!(s.contains("codex"));
2055 assert!(s.contains("OOM killed"));
2056 assert!(s.contains("signal 9") || s.contains("exit 137"));
2057 }
2058}