1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60
61static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
67
68pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
72
73pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
77
78pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
80
81fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
89 let base = base.max(1);
90 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
91}
92
93pub fn chunk_embed_batch_size() -> usize {
95 let dim = crate::constants::embedding_dim();
96 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
97 tracing::debug!(
98 dim,
99 base = CHUNK_EMBED_BATCH_SIZE,
100 batch,
101 "adaptive chunk batch size (G44)"
102 );
103 batch
104}
105
106pub fn entity_embed_batch_size() -> usize {
108 let dim = crate::constants::embedding_dim();
109 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
110 tracing::debug!(
111 dim,
112 base = ENTITY_EMBED_BATCH_SIZE,
113 batch,
114 "adaptive entity batch size (G44)"
115 );
116 batch
117}
118
119pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
121 if let Some(rt) = RUNTIME.get() {
122 return Ok(rt);
123 }
124 let rt = tokio::runtime::Builder::new_multi_thread()
125 .worker_threads(2)
126 .enable_all()
127 .build()
128 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
129 let _ = RUNTIME.set(rt);
130 Ok(RUNTIME.get().expect("RUNTIME initialised above"))
131}
132
133pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
135 if let Some(e) = EMBEDDER.get() {
136 return Ok(e);
137 }
138 let backend = LlmEmbedding::detect_available()?;
139 let _ = EMBEDDER.set(Mutex::new(backend));
140 Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
141}
142
143pub fn get_claude_embedder(
148 claude_binary: Option<&Path>,
149 claude_model: Option<&str>,
150) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
151 if let Some(e) = CLAUDE_EMBEDDER.get() {
152 return Ok(e);
153 }
154 let mut builder = LlmEmbedding::with_claude_builder();
155 if let Some(b) = claude_binary {
156 builder = builder.override_binary(b.to_path_buf());
157 }
158 if let Some(m) = claude_model {
159 builder = builder.override_model(m.to_string());
160 }
161 let backend = builder.build()?;
162 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
163 Ok(CLAUDE_EMBEDDER
164 .get()
165 .expect("CLAUDE_EMBEDDER initialised above"))
166}
167
168pub fn embed_via_claude_local(
172 _models_dir: &Path,
173 text: &str,
174 claude_binary: Option<&Path>,
175 claude_model: Option<&str>,
176) -> Result<Vec<f32>, AppError> {
177 let _slot_guard = acquire_llm_slot_for_embedding()?;
178 let embedder = get_claude_embedder(claude_binary, claude_model)?;
179 embed_passage(embedder, text)
180}
181
182pub fn embed_via_claude_local_resolved(
187 _models_dir: &Path,
188 text: &str,
189 claude_binary: Option<&Path>,
190 claude_model: Option<&str>,
191) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
192 let _slot_guard = acquire_llm_slot_for_embedding()?;
193 let embedder = get_claude_embedder(claude_binary, claude_model)?;
194 let v = embed_passage(embedder, text)?;
195 Ok((v, LlmBackendKind::Claude))
196}
197fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
200 embedder.lock().clone()
201}
202
203pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
207 let client = clone_client(embedder);
208 let result = client.embed_passage(text)?;
209 validate_dim(result)
210}
211
212pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
216 let client = clone_client(embedder);
217 let result = client.embed_query(text)?;
218 validate_dim(result)
219}
220
221pub fn embed_passages_controlled(
226 embedder: &Mutex<LlmEmbedding>,
227 texts: &[&str],
228 _token_counts: &[usize],
229) -> Result<Vec<Vec<f32>>, AppError> {
230 if texts.is_empty() {
231 return Ok(Vec::new());
232 }
233 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
234 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
235}
236
237pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
238 let _slot_guard = acquire_llm_slot_for_embedding()?;
239 let embedder = get_embedder(models_dir)?;
240 embed_passage(embedder, text)
241}
242
243pub fn embed_passage_local_resolved(
249 models_dir: &Path,
250 text: &str,
251) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
252 let _slot_guard = acquire_llm_slot_for_embedding()?;
253 let embedder = get_embedder(models_dir)?;
254 let v = embed_passage(embedder, text)?;
255 let kind = match embedder.lock().flavour() {
256 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
257 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
258 };
259 Ok((v, kind))
260}
261
262pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
263 let _slot_guard = acquire_llm_slot_for_embedding()?;
264 let embedder = get_embedder(models_dir)?;
265 embed_query(embedder, text)
266}
267
268pub fn embed_passage_with_choice(
285 models_dir: &Path,
286 text: &str,
287 choice: Option<crate::cli::LlmBackendChoice>,
288) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
289 let _slot_guard = acquire_llm_slot_for_embedding()?;
290 match choice {
291 None => {
292 let embedder = get_embedder(models_dir)?;
293 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
294 }
295 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
296 }
297}
298pub fn try_embed_query_with_choice(
304 models_dir: &Path,
305 text: &str,
306 choice: Option<crate::cli::LlmBackendChoice>,
307) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
308 match embed_passage_with_choice(models_dir, text, choice) {
309 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
322 Ok((v, backend)) => Ok((v, backend)),
323 Err(e) => Err(classify_embedding_error(e)),
324 }
325}
326fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
338 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
339 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
340 .ok()
341 .and_then(|s| s.parse::<u32>().ok())
342 .filter(|n| *n >= 1)
343 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
344 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
345 0
346 } else {
347 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
348 .ok()
349 .and_then(|s| s.parse::<u64>().ok())
350 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
351 };
352 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
360 Ok(guard) => Ok(guard),
361 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
362 "slot exhausted: {e} (fall back to FTS5)"
363 ))),
364 Err(e) => Err(e),
365 }
366}
367#[derive(Debug, Clone, Copy, PartialEq, Eq)]
379pub enum EmbeddingErrorKind {
380 OAuth,
382 Quota,
384 SlotExhausted,
386 BackendMismatch,
388 ZeroDimension,
390 Unknown,
392}
393
394impl EmbeddingErrorKind {
395 pub fn classify(msg: &str) -> Self {
404 let m = msg.to_lowercase();
405 if m.contains("oauth") {
406 Self::OAuth
407 } else if m.contains("quota") {
408 Self::Quota
409 } else if m.contains("slot exhausted") {
410 Self::SlotExhausted
411 } else if m.contains("backend mismatch") {
412 Self::BackendMismatch
413 } else if m.contains("dim") && m.contains("zero") {
414 Self::ZeroDimension
415 } else {
416 Self::Unknown
417 }
418 }
419
420 pub fn code(&self) -> &'static str {
422 match self {
423 Self::OAuth => "oauth",
424 Self::Quota => "quota",
425 Self::SlotExhausted => "slot-exhausted",
426 Self::BackendMismatch => "backend-mismatch",
427 Self::ZeroDimension => "zero-dimension",
428 Self::Unknown => "unknown",
429 }
430 }
431}
432
433impl std::fmt::Display for EmbeddingErrorKind {
434 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
435 f.write_str(self.code())
436 }
437}
438
439#[derive(Debug, Clone, PartialEq)]
446pub enum FallbackReason {
447 EmbeddingFailed(String),
451 SlotExhausted,
456 OAuthQuota { backend: &'static str },
460 BackendMismatch {
464 requested: &'static str,
465 resolved: &'static str,
466 },
467 DimZero,
472 Cancelled,
474 Timeout {
477 operation: String,
478 duration_secs: u64,
479 },
480}
481
482impl FallbackReason {
483 pub fn reason_code(&self) -> &'static str {
487 match self {
488 Self::EmbeddingFailed(_) => "embedding_failed",
489 Self::SlotExhausted => "slot_exhausted",
490 Self::OAuthQuota { .. } => "oauth_quota",
491 Self::BackendMismatch { .. } => "backend_mismatch",
492 Self::DimZero => "dim_zero",
493 Self::Cancelled => "cancelled",
494 Self::Timeout { .. } => "timeout",
495 }
496 }
497}
498
499impl std::fmt::Display for FallbackReason {
500 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
501 match self {
502 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
503 Self::SlotExhausted => write!(
504 f,
505 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
506 ),
507 Self::OAuthQuota { backend } => {
508 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
509 }
510 Self::BackendMismatch {
511 requested,
512 resolved,
513 } => {
514 write!(
515 f,
516 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
517 )
518 }
519 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
520 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
521 Self::Timeout {
522 operation,
523 duration_secs,
524 } => {
525 write!(
526 f,
527 "embedding timed out after {duration_secs}s during {operation}"
528 )
529 }
530 }
531 }
532}
533
534impl std::error::Error for FallbackReason {}
535
536pub fn try_embed_query_with_fallback(
544 models_dir: &Path,
545 query: &str,
546) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
547 match embed_query_local(models_dir, query) {
548 Ok(v) => Ok((v, LlmBackendKind::None)),
549 Err(e) => Err(classify_embedding_error(e)),
550 }
551}
552
553pub fn try_embed_query_with_deterministic_fallback(
562 models_dir: &Path,
563 query: &str,
564 choice: Option<crate::cli::LlmBackendChoice>,
565) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
566 match try_embed_query_with_choice(models_dir, query, choice) {
567 Ok(t) => Ok(t),
568 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
569 let alt = match backend {
570 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
571 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
572 _ => None,
573 };
574 if let Some(alt_choice) = alt {
575 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
576 } else {
577 Err(reason)
578 }
579 }
580 Err(reason @ FallbackReason::SlotExhausted) => {
581 std::thread::sleep(std::time::Duration::from_millis(750));
582 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
583 }
584 Err(other) => Err(other),
585 }
586}
587
588pub fn classify_embedding_error(err: AppError) -> FallbackReason {
596 match err {
597 AppError::Timeout {
598 operation,
599 duration_secs,
600 } => FallbackReason::Timeout {
601 operation,
602 duration_secs,
603 },
604 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
605 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
614 EmbeddingErrorKind::OAuth => {
615 let backend = if msg.contains("codex") {
616 "codex"
617 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
618 "claude"
623 } else {
624 "unknown"
625 };
626 FallbackReason::OAuthQuota { backend }
627 }
628 EmbeddingErrorKind::Quota => {
629 let backend = if msg.contains("codex") {
630 "codex"
631 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
632 "claude"
633 } else {
634 "unknown"
635 };
636 FallbackReason::OAuthQuota { backend }
637 }
638 EmbeddingErrorKind::BackendMismatch => {
639 let (requested, resolved) =
644 if msg.contains("requested claude") && msg.contains("but codex") {
645 ("claude", "codex")
646 } else if msg.contains("requested codex") && msg.contains("but claude") {
647 ("codex", "claude")
648 } else if msg.contains("requested claude") {
649 ("claude", "unknown")
650 } else if msg.contains("requested codex") {
651 ("codex", "unknown")
652 } else {
653 ("unknown", "unknown")
654 };
655 FallbackReason::BackendMismatch {
656 requested,
657 resolved,
658 }
659 }
660 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
661 EmbeddingErrorKind::Unknown => {
662 if msg.contains("cancelled") {
663 FallbackReason::Cancelled
664 } else {
665 FallbackReason::EmbeddingFailed(msg)
666 }
667 }
668 },
669 e => FallbackReason::EmbeddingFailed(e.to_string()),
670 }
671}
672pub fn embed_with_fallback(
691 models_dir: &Path,
692 text: &str,
693 chain: &[LlmBackendKind],
694 skip_on_failure: bool,
695) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
696 use crate::llm::exit_code_hints::LlmBackendError;
697 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
698 vec![
699 LlmBackendKind::Codex,
700 LlmBackendKind::Claude,
701 LlmBackendKind::None,
702 ]
703 } else {
704 chain.to_vec()
705 };
706
707 let mut last_err: Option<AppError> = None;
708 for backend in &effective {
709 match embed_via_backend_strict(
720 models_dir,
721 text,
722 backend,
723 last_err.as_ref(),
724 skip_on_failure,
725 ) {
726 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
727 Err(e) => {
728 tracing::warn!(
729 target: "embedding",
730 backend = ?backend,
731 error = %e,
732 "embed_with_fallback: backend failed, trying next"
733 );
734 last_err = Some(e);
735 }
736 }
737 }
738 if skip_on_failure {
739 return Ok((Vec::new(), LlmBackendKind::None));
744 }
745 Err(last_err
746 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
747}
748
749#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
753pub enum LlmBackendKind {
754 Codex,
756 Claude,
758 None,
760}
761
762impl LlmBackendKind {
763 pub fn as_str(self) -> &'static str {
766 match self {
767 Self::Codex => "codex",
768 Self::Claude => "claude",
769 Self::None => "none",
770 }
771 }
772}
773
774pub fn embed_via_backend(
789 models_dir: &Path,
790 text: &str,
791 backend: &LlmBackendKind,
792) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
793 match backend {
794 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
795 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
796 LlmBackendKind::Claude => {
797 tracing::debug!(
801 target: "embedder",
802 backend = "claude",
803 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
804 );
805 embed_via_claude_local_resolved(models_dir, text, None, None)
806 }
807 }
808}
809
810pub fn embed_via_backend_strict(
823 models_dir: &Path,
824 text: &str,
825 backend: &LlmBackendKind,
826 last_err: Option<&AppError>,
827 skip_on_failure: bool,
828) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
829 use crate::llm::exit_code_hints::LlmBackendError;
830 match backend {
831 LlmBackendKind::None => {
832 if skip_on_failure && last_err.is_none() {
836 Ok((Vec::new(), LlmBackendKind::None))
837 } else if last_err.is_some() {
838 Err(match last_err {
842 Some(e) => AppError::Embedding(format!("{e}")),
843 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
844 })
845 } else {
846 Err(AppError::Embedding(
849 LlmBackendError::NoBackendsAvailable.to_string(),
850 ))
851 }
852 }
853 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
854 LlmBackendKind::Claude => {
855 tracing::debug!(
856 target: "embedder",
857 backend = "claude",
858 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
859 );
860 embed_via_claude_local_resolved(models_dir, text, None, None)
861 }
862 }
863}
864
865pub fn embed_via_backend_legacy(
870 models_dir: &Path,
871 text: &str,
872 backend: &LlmBackendKind,
873) -> Result<Vec<f32>, AppError> {
874 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
875}
876
877pub fn embed_passages_controlled_local(
878 models_dir: &Path,
879 texts: &[&str],
880 token_counts: &[usize],
881) -> Result<Vec<Vec<f32>>, AppError> {
882 let embedder = get_embedder(models_dir)?;
883 embed_passages_controlled(embedder, texts, token_counts)
884}
885
886pub fn embed_passages_parallel_local(
889 models_dir: &Path,
890 texts: &[String],
891 parallelism: usize,
892 batch_size: usize,
893) -> Result<Vec<Vec<f32>>, AppError> {
894 let embedder = get_embedder(models_dir)?;
895 embed_texts_parallel(embedder, texts, parallelism, batch_size)
896}
897
898type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
910
911static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
912
913fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
914 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
915}
916
917fn entity_cache_key(model: &str, text: &str) -> u64 {
918 let mut hasher = blake3::Hasher::new();
919 hasher.update(model.as_bytes());
920 hasher.update(b"\0");
921 hasher.update(text.as_bytes());
922 let h = hasher.finalize();
923 let bytes = h.as_bytes();
924 u64::from_le_bytes([
925 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
926 ])
927}
928
929pub fn embed_entity_texts_cached(
939 models_dir: &Path,
940 texts: &[String],
941 parallelism: usize,
942) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
943 if texts.is_empty() {
944 return Ok((Vec::new(), EmbedCacheStats::default()));
945 }
946 let embedder = get_embedder(models_dir)?;
947 let model = embedder.lock().model_label();
948 let cache = entity_embed_cache();
949 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
950 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
951 {
952 let guard = cache.lock();
953 for (i, text) in texts.iter().enumerate() {
954 let key = entity_cache_key(&model, text);
955 if let Some(v) = guard.get(&key) {
956 hits[i] = Some(Arc::clone(v));
957 } else {
958 miss_indices.push(i);
959 }
960 }
961 }
962 let miss_count = miss_indices.len();
963 if miss_count > 0 {
964 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
965 let miss_vecs = embed_texts_parallel(
966 embedder,
967 &miss_texts,
968 parallelism,
969 entity_embed_batch_size(),
970 )?;
971 let mut guard = cache.lock();
972 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
973 let vec = Arc::new(miss_vecs[slot].clone());
974 let key = entity_cache_key(&model, &texts[orig_idx]);
975 guard.insert(key, Arc::clone(&vec));
976 hits[orig_idx] = Some(vec);
977 }
978 }
979 let mut out = Vec::with_capacity(texts.len());
980 for hit in hits.into_iter() {
981 let v = hit.ok_or_else(|| {
982 AppError::Embedding("entity embed cache produced null result".to_string())
983 })?;
984 out.push((*v).clone());
985 }
986 Ok((
987 out,
988 EmbedCacheStats {
989 requested: texts.len(),
990 hits: texts.len() - miss_count,
991 misses: miss_count,
992 },
993 ))
994}
995
996#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
998pub struct EmbedCacheStats {
999 pub requested: usize,
1000 pub hits: usize,
1001 pub misses: usize,
1002}
1003
1004impl EmbedCacheStats {
1005 pub fn hit_rate(&self) -> f64 {
1007 if self.requested == 0 {
1008 0.0
1009 } else {
1010 self.hits as f64 / self.requested as f64
1011 }
1012 }
1013}
1014
1015pub fn embed_texts_parallel(
1028 embedder: &Mutex<LlmEmbedding>,
1029 texts: &[String],
1030 parallelism: usize,
1031 batch_size: usize,
1032) -> Result<Vec<Vec<f32>>, AppError> {
1033 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1034 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1035 slots[idx] = Some(v.to_vec());
1036 Ok(())
1037 })?;
1038 let mut out = Vec::with_capacity(slots.len());
1039 for (idx, slot) in slots.into_iter().enumerate() {
1040 out.push(slot.ok_or_else(|| {
1041 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1042 })?);
1043 }
1044 Ok(out)
1045}
1046
1047pub fn embed_texts_parallel_with(
1051 embedder: &Mutex<LlmEmbedding>,
1052 texts: &[String],
1053 parallelism: usize,
1054 batch_size: usize,
1055 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1056) -> Result<(), AppError> {
1057 if texts.is_empty() {
1058 return Ok(());
1059 }
1060 let dim = crate::constants::embedding_dim();
1061 if texts.len() == 1 {
1062 let v = embed_passage(embedder, &texts[0])?;
1063 return on_result(0, &v);
1064 }
1065
1066 let client = clone_client(embedder);
1067 let permits = effective_permits(parallelism);
1068 let batches = build_batches(texts, batch_size.max(1));
1069 let token = crate::cancel_token().clone();
1070
1071 let work = move |batch: Vec<(usize, String)>| {
1072 let client = client.clone();
1073 async move {
1074 client
1075 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1076 .await
1077 }
1078 };
1079
1080 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1081 match tokio::runtime::Handle::try_current() {
1082 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1083 Err(_) => shared_runtime()?.block_on(fan_out),
1084 }
1085}
1086
1087fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1089 texts
1090 .iter()
1091 .cloned()
1092 .enumerate()
1093 .collect::<Vec<_>>()
1094 .chunks(batch_size)
1095 .map(|c| c.to_vec())
1096 .collect()
1097}
1098
1099pub fn effective_permits(requested: usize) -> usize {
1104 let cpus = std::thread::available_parallelism()
1105 .map(|n| n.get())
1106 .unwrap_or(4);
1107 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1108 / crate::constants::LLM_WORKER_RSS_MB)
1109 .max(1) as usize;
1110 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1111}
1112
1113async fn run_bounded<F, Fut>(
1123 batches: Vec<Vec<(usize, String)>>,
1124 permits: usize,
1125 dim: usize,
1126 token: CancellationToken,
1127 work: F,
1128 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1129) -> Result<(), AppError>
1130where
1131 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1132 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1133{
1134 let total_batches = batches.len();
1135 let semaphore = Arc::new(Semaphore::new(permits));
1136 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1139 let mut set: JoinSet<()> = JoinSet::new();
1140
1141 for (batch_idx, batch) in batches.into_iter().enumerate() {
1142 let sem = Arc::clone(&semaphore);
1143 let token = token.clone();
1144 let tx = tx.clone();
1145 let work = work.clone();
1146 set.spawn(async move {
1147 let wait_start = std::time::Instant::now();
1148 let Ok(_permit) = sem.acquire_owned().await else {
1151 let _ = tx
1152 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1153 .await;
1154 return;
1155 };
1156 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1157 let work_start = std::time::Instant::now();
1158 let outcome = if crate::should_obey_shutdown() {
1164 tokio::select! {
1165 res = work(batch) => res,
1166 _ = token.cancelled() => Err(AppError::Embedding(
1167 "embedding cancelled by shutdown signal".to_string(),
1168 )),
1169 }
1170 } else {
1171 work(batch).await
1172 };
1173 tracing::debug!(
1175 target: "embedding",
1176 batch_idx,
1177 permit_wait_ms,
1178 work_ms = work_start.elapsed().as_millis() as u64,
1179 ok = outcome.is_ok(),
1180 "embedding batch finished"
1181 );
1182 let _ = tx.send(outcome).await;
1183 });
1184 }
1185 drop(tx);
1186
1187 let mut completed = 0usize;
1188 let mut failed = 0usize;
1189 let mut cancelled = 0usize;
1190 let mut first_error: Option<AppError> = None;
1191
1192 while let Some(message) = rx.recv().await {
1193 match message {
1194 Ok(items) => {
1195 completed += 1;
1196 if first_error.is_none() {
1197 for (idx, v) in items {
1198 if v.len() != dim {
1199 first_error = Some(AppError::Embedding(format!(
1200 "LLM returned {} dims for item {idx}, expected {dim}; \
1201 refusing to truncate or pad silently (G42/C5)",
1202 v.len()
1203 )));
1204 break;
1205 }
1206 if let Err(e) = on_result(idx, &v) {
1207 first_error = Some(e);
1208 break;
1209 }
1210 }
1211 if first_error.is_some() {
1212 set.shutdown().await;
1215 }
1216 }
1217 }
1218 Err(e) => {
1219 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1220 cancelled += 1;
1221 } else {
1222 failed += 1;
1223 }
1224 if first_error.is_none() {
1225 first_error = Some(e);
1226 set.shutdown().await;
1227 }
1228 }
1229 }
1230 }
1231
1232 while let Some(join_result) = set.join_next().await {
1235 if let Err(join_err) = join_result {
1236 if join_err.is_panic() {
1237 failed += 1;
1238 if first_error.is_none() {
1239 first_error = Some(AppError::Embedding(format!(
1240 "embedding task panicked: {join_err}"
1241 )));
1242 }
1243 } else {
1244 cancelled += 1;
1245 }
1246 }
1247 }
1248
1249 tracing::debug!(
1259 target: "embedding",
1260 total_batches,
1261 completed,
1262 failed,
1263 cancelled,
1264 "embedding fan-out finished"
1265 );
1266
1267 match first_error {
1268 Some(e) => Err(e),
1269 None => Ok(()),
1270 }
1271}
1272
1273pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1274 let mut out = Vec::with_capacity(v.len() * 4);
1275 for f in v {
1276 out.extend_from_slice(&f.to_le_bytes());
1277 }
1278 out
1279}
1280
1281pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1282 let mut out = Vec::with_capacity(bytes.len() / 4);
1283 for chunk in bytes.chunks_exact(4) {
1284 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1285 }
1286 out
1287}
1288
1289pub fn embedding_dim() -> usize {
1292 crate::constants::embedding_dim()
1293}
1294
1295fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1299 let dim = crate::constants::embedding_dim();
1300 if v.len() != dim {
1301 return Err(AppError::Embedding(format!(
1302 "embedding has {} dims, expected {dim}; \
1303 refusing to truncate or pad silently (G42/C5)",
1304 v.len()
1305 )));
1306 }
1307 Ok(v)
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312 use super::*;
1313 use std::sync::atomic::{AtomicUsize, Ordering};
1314
1315 #[test]
1316 fn f32_to_bytes_roundtrip() {
1317 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1318 let bytes = f32_to_bytes(&input);
1319 assert_eq!(bytes.len(), input.len() * 4);
1320 let out = bytes_to_f32(&bytes);
1321 assert_eq!(out, input);
1322 }
1323
1324 #[test]
1325 fn validate_dim_rejects_divergent_vectors() {
1326 let dim = crate::constants::embedding_dim();
1329 let long = vec![0.0; dim + 10];
1330 assert!(validate_dim(long).is_err(), "longer vector must error");
1331 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1332 assert!(validate_dim(short).is_err(), "shorter vector must error");
1333 let exact = vec![0.0; dim];
1334 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1335 }
1336
1337 #[test]
1338 fn embedding_dim_matches_constants_source() {
1339 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1340 }
1341
1342 #[test]
1343 fn build_batches_preserves_global_indices() {
1344 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1345 let batches = build_batches(&texts, 4);
1346 assert_eq!(batches.len(), 3);
1347 assert_eq!(batches[0].len(), 4);
1348 assert_eq!(batches[2].len(), 2);
1349 assert_eq!(batches[2][1].0, 9);
1350 assert_eq!(batches[2][1].1, "t9");
1351 }
1352
1353 #[test]
1354 fn effective_permits_clamps_to_bounds() {
1355 assert!(effective_permits(0) >= 1);
1356 assert!(effective_permits(1000) <= 32);
1357 }
1358
1359 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1360 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1361 }
1362
1363 fn dummy_vec(dim: usize) -> Vec<f32> {
1364 vec![0.0; dim]
1365 }
1366
1367 #[test]
1370 fn concurrency_peak_never_exceeds_permits() {
1371 let permits = 4usize;
1372 let batches = test_batches(permits * 10);
1373 let dim = crate::constants::embedding_dim();
1374 let current = Arc::new(AtomicUsize::new(0));
1375 let peak = Arc::new(AtomicUsize::new(0));
1376
1377 let current_c = Arc::clone(¤t);
1378 let peak_c = Arc::clone(&peak);
1379 let work = move |batch: Vec<(usize, String)>| {
1380 let current = Arc::clone(¤t_c);
1381 let peak = Arc::clone(&peak_c);
1382 async move {
1383 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1384 peak.fetch_max(now, Ordering::SeqCst);
1385 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1386 current.fetch_sub(1, Ordering::SeqCst);
1387 Ok(batch
1388 .into_iter()
1389 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1390 .collect())
1391 }
1392 };
1393
1394 let mut delivered = 0usize;
1395 let rt = tokio::runtime::Builder::new_multi_thread()
1396 .worker_threads(4)
1397 .enable_all()
1398 .build()
1399 .expect("test runtime");
1400 rt.block_on(run_bounded(
1401 batches,
1402 permits,
1403 dim,
1404 CancellationToken::new(),
1405 work,
1406 &mut |_idx, _v| {
1407 delivered += 1;
1408 Ok(())
1409 },
1410 ))
1411 .expect("fan-out must succeed");
1412
1413 assert_eq!(delivered, permits * 10, "every item must be delivered");
1414 assert!(
1415 peak.load(Ordering::SeqCst) <= permits,
1416 "peak concurrency {} exceeded permits {permits}",
1417 peak.load(Ordering::SeqCst)
1418 );
1419 }
1420
1421 #[test]
1424 fn panicking_task_returns_permit_and_surfaces_error() {
1425 let permits = 2usize;
1426 let batches = test_batches(4);
1427 let dim = crate::constants::embedding_dim();
1428
1429 let work = move |batch: Vec<(usize, String)>| async move {
1430 if batch[0].0 == 1 {
1431 panic!("intentional test panic");
1432 }
1433 Ok(batch
1434 .into_iter()
1435 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1436 .collect())
1437 };
1438
1439 let rt = tokio::runtime::Builder::new_multi_thread()
1440 .worker_threads(2)
1441 .enable_all()
1442 .build()
1443 .expect("test runtime");
1444 let result = rt.block_on(run_bounded(
1445 batches,
1446 permits,
1447 dim,
1448 CancellationToken::new(),
1449 work,
1450 &mut |_idx, _v| Ok(()),
1451 ));
1452
1453 let err = result.expect_err("panic must surface as an error");
1454 assert!(
1455 err.to_string().contains("panicked"),
1456 "error must mention the panic: {err}"
1457 );
1458 }
1459
1460 #[test]
1463 fn cancellation_terminates_fan_out_quickly() {
1464 let permits = 2usize;
1465 let batches = test_batches(8);
1466 let dim = crate::constants::embedding_dim();
1467 let token = CancellationToken::new();
1468
1469 let work = move |batch: Vec<(usize, String)>| async move {
1470 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1472 Ok(batch
1473 .into_iter()
1474 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1475 .collect())
1476 };
1477
1478 let rt = tokio::runtime::Builder::new_multi_thread()
1479 .worker_threads(2)
1480 .enable_all()
1481 .build()
1482 .expect("test runtime");
1483 let cancel = token.clone();
1484 let start = std::time::Instant::now();
1485 let result = rt.block_on(async move {
1486 tokio::spawn(async move {
1487 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1488 cancel.cancel();
1489 });
1490 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1491 });
1492
1493 assert!(result.is_err(), "cancelled fan-out must report an error");
1494 assert!(
1495 start.elapsed() < std::time::Duration::from_secs(10),
1496 "graceful shutdown must finish well under the work duration"
1497 );
1498 }
1499
1500 #[test]
1503 fn fan_out_rejects_divergent_dim() {
1504 let permits = 2usize;
1505 let batches = test_batches(2);
1506 let dim = crate::constants::embedding_dim();
1507
1508 let work = move |batch: Vec<(usize, String)>| async move {
1509 Ok(batch
1510 .into_iter()
1511 .map(|(i, _)| (i, vec![0.0f32; 3]))
1512 .collect::<Vec<(usize, Vec<f32>)>>())
1513 };
1514
1515 let rt = tokio::runtime::Builder::new_multi_thread()
1516 .worker_threads(2)
1517 .enable_all()
1518 .build()
1519 .expect("test runtime");
1520 let result = rt.block_on(run_bounded(
1521 batches,
1522 permits,
1523 dim,
1524 CancellationToken::new(),
1525 work,
1526 &mut |_idx, _v| Ok(()),
1527 ));
1528
1529 let err = result.expect_err("divergent dim must fail the fan-out");
1530 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1531 }
1532
1533 #[test]
1535 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1536 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1537 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1538 }
1539
1540 #[test]
1542 fn adaptive_batch_dim384_shrinks() {
1543 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1544 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1545 }
1546
1547 #[test]
1549 fn adaptive_batch_intermediate_dims() {
1550 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1551 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1552 }
1553
1554 #[test]
1556 fn adaptive_batch_small_dim_clamps_to_base() {
1557 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1558 }
1559
1560 #[test]
1562 fn adaptive_batch_total_function() {
1563 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1564 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1565 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1566 }
1567
1568 #[test]
1570 #[serial_test::serial(env)]
1571 fn adaptive_wrappers_follow_env_dim() {
1572 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1573 let chunk = chunk_embed_batch_size();
1574 let entity = entity_embed_batch_size();
1575 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1576 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1577 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1578 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1579 }
1580
1581 #[test]
1589 fn embedding_error_kind_classify_oauth_message() {
1590 assert_eq!(
1591 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1592 EmbeddingErrorKind::OAuth,
1593 );
1594 assert_eq!(
1595 EmbeddingErrorKind::classify("oauth authentication failed"),
1596 EmbeddingErrorKind::OAuth,
1597 );
1598 }
1599
1600 #[test]
1603 fn embedding_error_kind_classify_quota_message() {
1604 assert_eq!(
1605 EmbeddingErrorKind::classify("quota exhausted on backend"),
1606 EmbeddingErrorKind::Quota,
1607 );
1608 assert_eq!(
1609 EmbeddingErrorKind::classify("Usage quota limit reached"),
1610 EmbeddingErrorKind::Quota,
1611 );
1612 }
1613
1614 #[test]
1618 fn embedding_error_kind_classify_slot_exhausted_message() {
1619 assert_eq!(
1620 EmbeddingErrorKind::classify(
1621 "slot exhausted: failed to acquire LLM slot after backoff"
1622 ),
1623 EmbeddingErrorKind::SlotExhausted,
1624 );
1625 }
1626
1627 #[test]
1630 fn embedding_error_kind_classify_zero_dimension_message() {
1631 assert_eq!(
1632 EmbeddingErrorKind::classify("embedding returned dim=zero"),
1633 EmbeddingErrorKind::ZeroDimension,
1634 );
1635 assert_eq!(
1636 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1637 EmbeddingErrorKind::ZeroDimension,
1638 );
1639 }
1640
1641 #[test]
1645 fn embedding_error_kind_classify_unknown_fallback() {
1646 assert_eq!(
1647 EmbeddingErrorKind::classify("unrelated subprocess error"),
1648 EmbeddingErrorKind::Unknown,
1649 );
1650 assert_eq!(
1651 EmbeddingErrorKind::classify("rate limit hit"),
1652 EmbeddingErrorKind::Unknown,
1653 );
1654 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1656 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1657 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1658 assert_eq!(
1659 EmbeddingErrorKind::BackendMismatch.code(),
1660 "backend-mismatch"
1661 );
1662 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1663 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1664 }
1665
1666 #[test]
1668 fn fallback_reason_display_does_not_panic() {
1669 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1670 let _ = FallbackReason::Cancelled.to_string();
1671 let _ = FallbackReason::Timeout {
1672 operation: "embed_query".into(),
1673 duration_secs: 30,
1674 }
1675 .to_string();
1676 }
1677
1678 #[test]
1681 fn fallback_reason_is_partial_eq() {
1682 assert_eq!(
1683 FallbackReason::EmbeddingFailed("a".into()),
1684 FallbackReason::EmbeddingFailed("a".into())
1685 );
1686 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1687 assert_ne!(
1688 FallbackReason::EmbeddingFailed("a".into()),
1689 FallbackReason::EmbeddingFailed("b".into())
1690 );
1691 assert_ne!(
1692 FallbackReason::Cancelled,
1693 FallbackReason::Timeout {
1694 operation: "x".into(),
1695 duration_secs: 1
1696 }
1697 );
1698 }
1699
1700 #[test]
1703 fn fallback_reason_timeout_preserves_fields() {
1704 let r = FallbackReason::Timeout {
1705 operation: "embed_query_local".into(),
1706 duration_secs: 300,
1707 };
1708 match r {
1709 FallbackReason::Timeout {
1710 operation,
1711 duration_secs,
1712 } => {
1713 assert_eq!(operation, "embed_query_local");
1714 assert_eq!(duration_secs, 300);
1715 }
1716 other => panic!("expected Timeout, got {other:?}"),
1717 }
1718 }
1719
1720 #[test]
1726 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1727 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1728 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1731 let result = try_embed_query_with_fallback(bogus, "hello world");
1732 match result {
1733 Err(FallbackReason::EmbeddingFailed(msg)) => {
1734 assert!(!msg.is_empty(), "fallback message must not be empty");
1736 }
1737 Err(FallbackReason::Cancelled) => {
1738 panic!("expected EmbeddingFailed, got Cancelled");
1739 }
1740 Err(FallbackReason::Timeout { .. }) => {
1741 panic!("expected EmbeddingFailed, got Timeout");
1742 }
1743 Err(FallbackReason::SlotExhausted) => {
1744 panic!("expected EmbeddingFailed, got SlotExhausted");
1745 }
1746 Err(FallbackReason::OAuthQuota { .. }) => {
1747 panic!("expected EmbeddingFailed, got OAuthQuota");
1748 }
1749 Err(FallbackReason::BackendMismatch { .. }) => {
1750 panic!("expected EmbeddingFailed, got BackendMismatch");
1751 }
1752 Err(FallbackReason::DimZero) => {
1753 panic!("expected EmbeddingFailed, got DimZero");
1754 }
1755 Ok(_) => {
1756 panic!("expected an error, got Ok — embedder must fail for bogus path");
1757 }
1758 }
1759 }
1760
1761 #[test]
1763 fn g56_entity_cache_key_is_stable_and_distinct() {
1764 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1765 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1766 let k3 = entity_cache_key("codex:default", "claude-code");
1767 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1768 assert_eq!(k1, k2, "same model+text must hash identically");
1769 assert_ne!(k1, k3, "different text must hash differently");
1770 assert_ne!(k1, k4, "different model must hash differently");
1771 }
1772
1773 #[test]
1774 fn g56_entity_embed_cache_stats_hit_rate() {
1775 let zero = EmbedCacheStats::default();
1776 assert_eq!(zero.hit_rate(), 0.0);
1777 let half = EmbedCacheStats {
1778 requested: 4,
1779 hits: 2,
1780 misses: 2,
1781 };
1782 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1783 let all = EmbedCacheStats {
1784 requested: 7,
1785 hits: 7,
1786 misses: 0,
1787 };
1788 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1789 }
1790
1791 #[test]
1792 fn g56_entity_embed_cache_populates_and_hits() {
1793 let cache = entity_embed_cache();
1797 let model = "test-model";
1798 let text = "sqlite-graphrag";
1799 let key = entity_cache_key(model, text);
1800 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1801 cache.lock().insert(key, Arc::clone(&stored));
1802 let guard = cache.lock();
1803 let hit = guard.get(&key).expect("cache must return stored value");
1804 assert_eq!(hit.len(), crate::constants::embedding_dim());
1805 assert!((hit[0] - 0.42).abs() < 1e-6);
1806 }
1807
1808 #[test]
1809 fn g56_empty_texts_short_circuits_with_zero_stats() {
1810 let stats = EmbedCacheStats::default();
1813 assert_eq!(stats.requested, 0);
1814 assert_eq!(stats.hits, 0);
1815 assert_eq!(stats.misses, 0);
1816 assert_eq!(stats.hit_rate(), 0.0);
1817 }
1818}
1819
1820#[cfg(test)]
1824mod embed_with_fallback_tests {
1825 use super::*;
1826 use crate::llm::exit_code_hints::LlmBackendError;
1827
1828 #[test]
1829 fn none_backend_returns_empty_vector_without_calling_llm() {
1830 let (v, kind) = embed_via_backend(
1834 std::path::Path::new("/nonexistent"),
1835 "any text",
1836 &LlmBackendKind::None,
1837 )
1838 .expect("None backend never fails");
1839 assert!(v.is_empty());
1840 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1841 }
1842
1843 #[test]
1844 fn empty_chain_defaults_to_codex_claude_none() {
1845 let defaults = [
1849 LlmBackendKind::Codex,
1850 LlmBackendKind::Claude,
1851 LlmBackendKind::None,
1852 ];
1853
1854 #[allow(dead_code)]
1859 fn llm_backend_kind_as_str_is_stable() {
1860 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1861 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1862 assert_eq!(LlmBackendKind::None.as_str(), "none");
1863 }
1864
1865 #[allow(dead_code)]
1866 fn fallback_reason_reason_code_is_stable() {
1867 assert_eq!(
1868 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1869 "embedding_failed"
1870 );
1871 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1872 assert_eq!(
1873 FallbackReason::Timeout {
1874 operation: "embed_query".into(),
1875 duration_secs: 30
1876 }
1877 .reason_code(),
1878 "timeout"
1879 );
1880 }
1881 assert_eq!(defaults.len(), 3);
1882 }
1883
1884 #[test]
1885 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
1886 let chain = vec![LlmBackendKind::None];
1898 let err = embed_with_fallback(
1899 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1900 "hello",
1901 &chain,
1902 false,
1903 )
1904 .expect_err("chain of only [None] without skip_on_failure MUST abort");
1905 let msg = format!("{err}");
1906 assert!(
1907 msg.contains("no LLM backends available"),
1908 "error must mention exhausted chain, got: {msg}"
1909 );
1910 }
1911 #[test]
1912 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
1913 let chain = vec![LlmBackendKind::None];
1918 let v = embed_with_fallback(
1919 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1920 "hello",
1921 &chain,
1922 true,
1923 )
1924 .expect("None chain is always Ok");
1925 assert!(v.0.is_empty(), "vector must be empty");
1926 assert_eq!(v.1, LlmBackendKind::None);
1927 }
1928 #[allow(dead_code)]
1929 fn llm_backend_error_no_backends_default_message() {
1930 let e = LlmBackendError::NoBackendsAvailable;
1933 let h = e.hint();
1934 assert!(h.contains("--llm-fallback"));
1935 }
1936
1937 #[test]
1938 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
1939 let e = LlmBackendError::NonZeroExit {
1940 exit_code: Some(137),
1941 signal: Some(9),
1942 stdout_tail: "out".into(),
1943 stderr_tail: "OOM killed".into(),
1944 binary: "codex".into(),
1945 hint: "OOM".into(),
1946 };
1947 let s = e.to_string();
1948 assert!(s.contains("codex"));
1949 assert!(s.contains("OOM killed"));
1950 assert!(s.contains("signal 9") || s.contains("exit 137"));
1951 }
1952}