1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60
61static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
67
68pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
72
73pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
77
78pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
80
81fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
89 let base = base.max(1);
90 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
91}
92
93pub fn chunk_embed_batch_size() -> usize {
95 let dim = crate::constants::embedding_dim();
96 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
97 tracing::debug!(
98 dim,
99 base = CHUNK_EMBED_BATCH_SIZE,
100 batch,
101 "adaptive chunk batch size (G44)"
102 );
103 batch
104}
105
106pub fn entity_embed_batch_size() -> usize {
108 let dim = crate::constants::embedding_dim();
109 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
110 tracing::debug!(
111 dim,
112 base = ENTITY_EMBED_BATCH_SIZE,
113 batch,
114 "adaptive entity batch size (G44)"
115 );
116 batch
117}
118
119pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
121 if let Some(rt) = RUNTIME.get() {
122 return Ok(rt);
123 }
124 let rt = tokio::runtime::Builder::new_multi_thread()
125 .worker_threads(2)
126 .enable_all()
127 .build()
128 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
129 let _ = RUNTIME.set(rt);
130 Ok(RUNTIME.get().expect("RUNTIME initialised above"))
131}
132
133pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
135 if let Some(e) = EMBEDDER.get() {
136 return Ok(e);
137 }
138 let backend = LlmEmbedding::detect_available()?;
139 let _ = EMBEDDER.set(Mutex::new(backend));
140 Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
141}
142
143pub fn get_claude_embedder(
148 claude_binary: Option<&Path>,
149 claude_model: Option<&str>,
150) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
151 if let Some(e) = CLAUDE_EMBEDDER.get() {
152 return Ok(e);
153 }
154 let mut builder = LlmEmbedding::with_claude_builder();
155 if let Some(b) = claude_binary {
156 builder = builder.override_binary(b.to_path_buf());
157 }
158 if let Some(m) = claude_model {
159 builder = builder.override_model(m.to_string());
160 }
161 let backend = builder.build()?;
162 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
163 Ok(CLAUDE_EMBEDDER
164 .get()
165 .expect("CLAUDE_EMBEDDER initialised above"))
166}
167
168pub fn embed_via_claude_local(
172 _models_dir: &Path,
173 text: &str,
174 claude_binary: Option<&Path>,
175 claude_model: Option<&str>,
176) -> Result<Vec<f32>, AppError> {
177 let _slot_guard = acquire_llm_slot_for_embedding()?;
178 let embedder = get_claude_embedder(claude_binary, claude_model)?;
179 embed_passage(embedder, text)
180}
181
182pub fn embed_via_claude_local_resolved(
187 _models_dir: &Path,
188 text: &str,
189 claude_binary: Option<&Path>,
190 claude_model: Option<&str>,
191) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
192 let _slot_guard = acquire_llm_slot_for_embedding()?;
193 let embedder = get_claude_embedder(claude_binary, claude_model)?;
194 let v = embed_passage(embedder, text)?;
195 Ok((v, LlmBackendKind::Claude))
196}
197fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
200 embedder.lock().clone()
201}
202
203pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
207 let client = clone_client(embedder);
208 let result = client.embed_passage(text)?;
209 validate_dim(result)
210}
211
212pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
216 let client = clone_client(embedder);
217 let result = client.embed_query(text)?;
218 validate_dim(result)
219}
220
221pub fn embed_passages_controlled(
226 embedder: &Mutex<LlmEmbedding>,
227 texts: &[&str],
228 _token_counts: &[usize],
229) -> Result<Vec<Vec<f32>>, AppError> {
230 if texts.is_empty() {
231 return Ok(Vec::new());
232 }
233 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
234 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
235}
236
237pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
238 let _slot_guard = acquire_llm_slot_for_embedding()?;
239 let embedder = get_embedder(models_dir)?;
240 embed_passage(embedder, text)
241}
242
243pub fn should_skip_embedding_on_failure() -> bool {
247 matches!(
248 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
249 Ok("1") | Ok("true")
250 )
251}
252
253pub fn embed_passage_or_skip(
260 models_dir: &Path,
261 text: &str,
262 choice: Option<crate::cli::LlmBackendChoice>,
263) -> Result<Option<Vec<f32>>, AppError> {
264 match embed_passage_with_choice(models_dir, text, choice) {
265 Ok((v, _backend)) => Ok(Some(v)),
266 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
267 Err(e) => {
268 if should_skip_embedding_on_failure() {
269 tracing::warn!(
270 error = %e,
271 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
272 );
273 Ok(None)
274 } else {
275 Err(e)
276 }
277 }
278 }
279}
280
281pub fn embed_passage_local_resolved(
287 models_dir: &Path,
288 text: &str,
289) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
290 let _slot_guard = acquire_llm_slot_for_embedding()?;
291 let embedder = get_embedder(models_dir)?;
292 let v = embed_passage(embedder, text)?;
293 let kind = match embedder.lock().flavour() {
294 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
295 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
296 };
297 Ok((v, kind))
298}
299
300pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
301 let _slot_guard = acquire_llm_slot_for_embedding()?;
302 let embedder = get_embedder(models_dir)?;
303 embed_query(embedder, text)
304}
305
306pub fn embed_passage_with_choice(
323 models_dir: &Path,
324 text: &str,
325 choice: Option<crate::cli::LlmBackendChoice>,
326) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
327 let _slot_guard = acquire_llm_slot_for_embedding()?;
328 match choice {
329 None => {
330 let embedder = get_embedder(models_dir)?;
331 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
332 }
333 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
334 }
335}
336pub fn try_embed_query_with_choice(
342 models_dir: &Path,
343 text: &str,
344 choice: Option<crate::cli::LlmBackendChoice>,
345) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
346 match embed_passage_with_choice(models_dir, text, choice) {
347 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
360 Ok((v, backend)) => Ok((v, backend)),
361 Err(e) => Err(classify_embedding_error(e)),
362 }
363}
364fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
376 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
377 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
378 .ok()
379 .and_then(|s| s.parse::<u32>().ok())
380 .filter(|n| *n >= 1)
381 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
382 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
383 0
384 } else {
385 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
386 .ok()
387 .and_then(|s| s.parse::<u64>().ok())
388 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
389 };
390 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
398 Ok(guard) => Ok(guard),
399 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
400 "slot exhausted: {e} (fall back to FTS5)"
401 ))),
402 Err(e) => Err(e),
403 }
404}
405#[derive(Debug, Clone, Copy, PartialEq, Eq)]
417pub enum EmbeddingErrorKind {
418 OAuth,
420 Quota,
422 SlotExhausted,
424 BackendMismatch,
426 ZeroDimension,
428 Unknown,
430}
431
432impl EmbeddingErrorKind {
433 pub fn classify(msg: &str) -> Self {
442 let m = msg.to_lowercase();
443 if m.contains("oauth") {
444 Self::OAuth
445 } else if m.contains("quota") {
446 Self::Quota
447 } else if m.contains("slot exhausted") {
448 Self::SlotExhausted
449 } else if m.contains("backend mismatch") {
450 Self::BackendMismatch
451 } else if m.contains("dim") && m.contains("zero") {
452 Self::ZeroDimension
453 } else {
454 Self::Unknown
455 }
456 }
457
458 pub fn code(&self) -> &'static str {
460 match self {
461 Self::OAuth => "oauth",
462 Self::Quota => "quota",
463 Self::SlotExhausted => "slot-exhausted",
464 Self::BackendMismatch => "backend-mismatch",
465 Self::ZeroDimension => "zero-dimension",
466 Self::Unknown => "unknown",
467 }
468 }
469}
470
471impl std::fmt::Display for EmbeddingErrorKind {
472 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473 f.write_str(self.code())
474 }
475}
476
477#[derive(Debug, Clone, PartialEq)]
484pub enum FallbackReason {
485 EmbeddingFailed(String),
489 SlotExhausted,
494 OAuthQuota { backend: &'static str },
498 BackendMismatch {
502 requested: &'static str,
503 resolved: &'static str,
504 },
505 DimZero,
510 Cancelled,
512 Timeout {
515 operation: String,
516 duration_secs: u64,
517 },
518}
519
520impl FallbackReason {
521 pub fn reason_code(&self) -> &'static str {
525 match self {
526 Self::EmbeddingFailed(_) => "embedding_failed",
527 Self::SlotExhausted => "slot_exhausted",
528 Self::OAuthQuota { .. } => "oauth_quota",
529 Self::BackendMismatch { .. } => "backend_mismatch",
530 Self::DimZero => "dim_zero",
531 Self::Cancelled => "cancelled",
532 Self::Timeout { .. } => "timeout",
533 }
534 }
535}
536
537impl std::fmt::Display for FallbackReason {
538 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
539 match self {
540 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
541 Self::SlotExhausted => write!(
542 f,
543 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
544 ),
545 Self::OAuthQuota { backend } => {
546 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
547 }
548 Self::BackendMismatch {
549 requested,
550 resolved,
551 } => {
552 write!(
553 f,
554 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
555 )
556 }
557 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
558 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
559 Self::Timeout {
560 operation,
561 duration_secs,
562 } => {
563 write!(
564 f,
565 "embedding timed out after {duration_secs}s during {operation}"
566 )
567 }
568 }
569 }
570}
571
572impl std::error::Error for FallbackReason {}
573
574pub fn try_embed_query_with_fallback(
582 models_dir: &Path,
583 query: &str,
584) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
585 match embed_query_local(models_dir, query) {
586 Ok(v) => Ok((v, LlmBackendKind::None)),
587 Err(e) => Err(classify_embedding_error(e)),
588 }
589}
590
591pub fn try_embed_query_with_deterministic_fallback(
600 models_dir: &Path,
601 query: &str,
602 choice: Option<crate::cli::LlmBackendChoice>,
603) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
604 match try_embed_query_with_choice(models_dir, query, choice) {
605 Ok(t) => Ok(t),
606 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
607 let alt = match backend {
608 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
609 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
610 _ => None,
611 };
612 if let Some(alt_choice) = alt {
613 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
614 } else {
615 Err(reason)
616 }
617 }
618 Err(reason @ FallbackReason::SlotExhausted) => {
619 std::thread::sleep(std::time::Duration::from_millis(750));
620 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
621 }
622 Err(other) => Err(other),
623 }
624}
625
626pub fn classify_embedding_error(err: AppError) -> FallbackReason {
634 match err {
635 AppError::Timeout {
636 operation,
637 duration_secs,
638 } => FallbackReason::Timeout {
639 operation,
640 duration_secs,
641 },
642 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
643 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
652 EmbeddingErrorKind::OAuth => {
653 let backend = if msg.contains("codex") {
654 "codex"
655 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
656 "claude"
661 } else {
662 "unknown"
663 };
664 FallbackReason::OAuthQuota { backend }
665 }
666 EmbeddingErrorKind::Quota => {
667 let backend = if msg.contains("codex") {
668 "codex"
669 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
670 "claude"
671 } else {
672 "unknown"
673 };
674 FallbackReason::OAuthQuota { backend }
675 }
676 EmbeddingErrorKind::BackendMismatch => {
677 let (requested, resolved) =
682 if msg.contains("requested claude") && msg.contains("but codex") {
683 ("claude", "codex")
684 } else if msg.contains("requested codex") && msg.contains("but claude") {
685 ("codex", "claude")
686 } else if msg.contains("requested claude") {
687 ("claude", "unknown")
688 } else if msg.contains("requested codex") {
689 ("codex", "unknown")
690 } else {
691 ("unknown", "unknown")
692 };
693 FallbackReason::BackendMismatch {
694 requested,
695 resolved,
696 }
697 }
698 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
699 EmbeddingErrorKind::Unknown => {
700 if msg.contains("cancelled") {
701 FallbackReason::Cancelled
702 } else {
703 FallbackReason::EmbeddingFailed(msg)
704 }
705 }
706 },
707 e => FallbackReason::EmbeddingFailed(e.to_string()),
708 }
709}
710pub fn embed_with_fallback(
729 models_dir: &Path,
730 text: &str,
731 chain: &[LlmBackendKind],
732 skip_on_failure: bool,
733) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
734 use crate::llm::exit_code_hints::LlmBackendError;
735 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
736 vec![
737 LlmBackendKind::Codex,
738 LlmBackendKind::Claude,
739 LlmBackendKind::None,
740 ]
741 } else {
742 chain.to_vec()
743 };
744
745 let mut last_err: Option<AppError> = None;
746 for backend in &effective {
747 match embed_via_backend_strict(
758 models_dir,
759 text,
760 backend,
761 last_err.as_ref(),
762 skip_on_failure,
763 ) {
764 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
765 Err(e) => {
766 tracing::warn!(
767 target: "embedding",
768 backend = ?backend,
769 error = %e,
770 "embed_with_fallback: backend failed, trying next"
771 );
772 last_err = Some(e);
773 }
774 }
775 }
776 if skip_on_failure {
777 return Ok((Vec::new(), LlmBackendKind::None));
782 }
783 Err(last_err
784 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
785}
786
787#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
791pub enum LlmBackendKind {
792 Codex,
794 Claude,
796 None,
798}
799
800impl LlmBackendKind {
801 pub fn as_str(self) -> &'static str {
804 match self {
805 Self::Codex => "codex",
806 Self::Claude => "claude",
807 Self::None => "none",
808 }
809 }
810}
811
812pub fn embed_via_backend(
827 models_dir: &Path,
828 text: &str,
829 backend: &LlmBackendKind,
830) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
831 match backend {
832 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
833 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
834 LlmBackendKind::Claude => {
835 tracing::debug!(
839 target: "embedder",
840 backend = "claude",
841 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
842 );
843 embed_via_claude_local_resolved(models_dir, text, None, None)
844 }
845 }
846}
847
848pub fn embed_via_backend_strict(
861 models_dir: &Path,
862 text: &str,
863 backend: &LlmBackendKind,
864 last_err: Option<&AppError>,
865 skip_on_failure: bool,
866) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
867 use crate::llm::exit_code_hints::LlmBackendError;
868 match backend {
869 LlmBackendKind::None => {
870 if skip_on_failure && last_err.is_none() {
874 Ok((Vec::new(), LlmBackendKind::None))
875 } else if last_err.is_some() {
876 Err(match last_err {
880 Some(e) => AppError::Embedding(format!("{e}")),
881 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
882 })
883 } else {
884 Err(AppError::Embedding(
887 LlmBackendError::NoBackendsAvailable.to_string(),
888 ))
889 }
890 }
891 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
892 LlmBackendKind::Claude => {
893 tracing::debug!(
894 target: "embedder",
895 backend = "claude",
896 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
897 );
898 embed_via_claude_local_resolved(models_dir, text, None, None)
899 }
900 }
901}
902
903pub fn embed_via_backend_legacy(
908 models_dir: &Path,
909 text: &str,
910 backend: &LlmBackendKind,
911) -> Result<Vec<f32>, AppError> {
912 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
913}
914
915pub fn embed_passages_controlled_local(
916 models_dir: &Path,
917 texts: &[&str],
918 token_counts: &[usize],
919) -> Result<Vec<Vec<f32>>, AppError> {
920 let embedder = get_embedder(models_dir)?;
921 embed_passages_controlled(embedder, texts, token_counts)
922}
923
924pub fn embed_passages_parallel_local(
927 models_dir: &Path,
928 texts: &[String],
929 parallelism: usize,
930 batch_size: usize,
931) -> Result<Vec<Vec<f32>>, AppError> {
932 let embedder = get_embedder(models_dir)?;
933 embed_texts_parallel(embedder, texts, parallelism, batch_size)
934}
935
936type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
948
949static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
950
951fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
952 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
953}
954
955fn entity_cache_key(model: &str, text: &str) -> u64 {
956 let mut hasher = blake3::Hasher::new();
957 hasher.update(model.as_bytes());
958 hasher.update(b"\0");
959 hasher.update(text.as_bytes());
960 let h = hasher.finalize();
961 let bytes = h.as_bytes();
962 u64::from_le_bytes([
963 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
964 ])
965}
966
967pub fn embed_entity_texts_cached(
977 models_dir: &Path,
978 texts: &[String],
979 parallelism: usize,
980) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
981 if texts.is_empty() {
982 return Ok((Vec::new(), EmbedCacheStats::default()));
983 }
984 let embedder = get_embedder(models_dir)?;
985 let model = embedder.lock().model_label();
986 let cache = entity_embed_cache();
987 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
988 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
989 {
990 let guard = cache.lock();
991 for (i, text) in texts.iter().enumerate() {
992 let key = entity_cache_key(&model, text);
993 if let Some(v) = guard.get(&key) {
994 hits[i] = Some(Arc::clone(v));
995 } else {
996 miss_indices.push(i);
997 }
998 }
999 }
1000 let miss_count = miss_indices.len();
1001 if miss_count > 0 {
1002 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1003 let miss_vecs = embed_texts_parallel(
1004 embedder,
1005 &miss_texts,
1006 parallelism,
1007 entity_embed_batch_size(),
1008 )?;
1009 let mut guard = cache.lock();
1010 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1011 let vec = Arc::new(miss_vecs[slot].clone());
1012 let key = entity_cache_key(&model, &texts[orig_idx]);
1013 guard.insert(key, Arc::clone(&vec));
1014 hits[orig_idx] = Some(vec);
1015 }
1016 }
1017 let mut out = Vec::with_capacity(texts.len());
1018 for hit in hits.into_iter() {
1019 let v = hit.ok_or_else(|| {
1020 AppError::Embedding("entity embed cache produced null result".to_string())
1021 })?;
1022 out.push((*v).clone());
1023 }
1024 Ok((
1025 out,
1026 EmbedCacheStats {
1027 requested: texts.len(),
1028 hits: texts.len() - miss_count,
1029 misses: miss_count,
1030 },
1031 ))
1032}
1033
1034#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1036pub struct EmbedCacheStats {
1037 pub requested: usize,
1038 pub hits: usize,
1039 pub misses: usize,
1040}
1041
1042impl EmbedCacheStats {
1043 pub fn hit_rate(&self) -> f64 {
1045 if self.requested == 0 {
1046 0.0
1047 } else {
1048 self.hits as f64 / self.requested as f64
1049 }
1050 }
1051}
1052
1053pub fn embed_texts_parallel(
1066 embedder: &Mutex<LlmEmbedding>,
1067 texts: &[String],
1068 parallelism: usize,
1069 batch_size: usize,
1070) -> Result<Vec<Vec<f32>>, AppError> {
1071 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1072 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1073 slots[idx] = Some(v.to_vec());
1074 Ok(())
1075 })?;
1076 let mut out = Vec::with_capacity(slots.len());
1077 for (idx, slot) in slots.into_iter().enumerate() {
1078 out.push(slot.ok_or_else(|| {
1079 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1080 })?);
1081 }
1082 Ok(out)
1083}
1084
1085pub fn embed_texts_parallel_with(
1089 embedder: &Mutex<LlmEmbedding>,
1090 texts: &[String],
1091 parallelism: usize,
1092 batch_size: usize,
1093 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1094) -> Result<(), AppError> {
1095 if texts.is_empty() {
1096 return Ok(());
1097 }
1098 let dim = crate::constants::embedding_dim();
1099 if texts.len() == 1 {
1100 let v = embed_passage(embedder, &texts[0])?;
1101 return on_result(0, &v);
1102 }
1103
1104 let client = clone_client(embedder);
1105 let permits = effective_permits(parallelism);
1106 let batches = build_batches(texts, batch_size.max(1));
1107 let token = crate::cancel_token().clone();
1108
1109 let work = move |batch: Vec<(usize, String)>| {
1110 let client = client.clone();
1111 async move {
1112 client
1113 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1114 .await
1115 }
1116 };
1117
1118 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1119 match tokio::runtime::Handle::try_current() {
1120 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1121 Err(_) => shared_runtime()?.block_on(fan_out),
1122 }
1123}
1124
1125fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1127 texts
1128 .iter()
1129 .cloned()
1130 .enumerate()
1131 .collect::<Vec<_>>()
1132 .chunks(batch_size)
1133 .map(|c| c.to_vec())
1134 .collect()
1135}
1136
1137pub fn effective_permits(requested: usize) -> usize {
1142 let cpus = std::thread::available_parallelism()
1143 .map(|n| n.get())
1144 .unwrap_or(4);
1145 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1146 / crate::constants::LLM_WORKER_RSS_MB)
1147 .max(1) as usize;
1148 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1149}
1150
1151async fn run_bounded<F, Fut>(
1161 batches: Vec<Vec<(usize, String)>>,
1162 permits: usize,
1163 dim: usize,
1164 token: CancellationToken,
1165 work: F,
1166 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1167) -> Result<(), AppError>
1168where
1169 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1170 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1171{
1172 let total_batches = batches.len();
1173 let semaphore = Arc::new(Semaphore::new(permits));
1174 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1177 let mut set: JoinSet<()> = JoinSet::new();
1178
1179 for (batch_idx, batch) in batches.into_iter().enumerate() {
1180 let sem = Arc::clone(&semaphore);
1181 let token = token.clone();
1182 let tx = tx.clone();
1183 let work = work.clone();
1184 set.spawn(async move {
1185 let wait_start = std::time::Instant::now();
1186 let Ok(_permit) = sem.acquire_owned().await else {
1189 let _ = tx
1190 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1191 .await;
1192 return;
1193 };
1194 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1195 let work_start = std::time::Instant::now();
1196 let outcome = if crate::should_obey_shutdown() {
1202 tokio::select! {
1203 res = work(batch) => res,
1204 _ = token.cancelled() => Err(AppError::Embedding(
1205 "embedding cancelled by shutdown signal".to_string(),
1206 )),
1207 }
1208 } else {
1209 work(batch).await
1210 };
1211 tracing::debug!(
1213 target: "embedding",
1214 batch_idx,
1215 permit_wait_ms,
1216 work_ms = work_start.elapsed().as_millis() as u64,
1217 ok = outcome.is_ok(),
1218 "embedding batch finished"
1219 );
1220 let _ = tx.send(outcome).await;
1221 });
1222 }
1223 drop(tx);
1224
1225 let mut completed = 0usize;
1226 let mut failed = 0usize;
1227 let mut cancelled = 0usize;
1228 let mut first_error: Option<AppError> = None;
1229
1230 while let Some(message) = rx.recv().await {
1231 match message {
1232 Ok(items) => {
1233 completed += 1;
1234 if first_error.is_none() {
1235 for (idx, v) in items {
1236 if v.len() != dim {
1237 first_error = Some(AppError::Embedding(format!(
1238 "LLM returned {} dims for item {idx}, expected {dim}; \
1239 refusing to truncate or pad silently (G42/C5)",
1240 v.len()
1241 )));
1242 break;
1243 }
1244 if let Err(e) = on_result(idx, &v) {
1245 first_error = Some(e);
1246 break;
1247 }
1248 }
1249 if first_error.is_some() {
1250 set.shutdown().await;
1253 }
1254 }
1255 }
1256 Err(e) => {
1257 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1258 cancelled += 1;
1259 } else {
1260 failed += 1;
1261 }
1262 if first_error.is_none() {
1263 first_error = Some(e);
1264 set.shutdown().await;
1265 }
1266 }
1267 }
1268 }
1269
1270 while let Some(join_result) = set.join_next().await {
1273 if let Err(join_err) = join_result {
1274 if join_err.is_panic() {
1275 failed += 1;
1276 if first_error.is_none() {
1277 first_error = Some(AppError::Embedding(format!(
1278 "embedding task panicked: {join_err}"
1279 )));
1280 }
1281 } else {
1282 cancelled += 1;
1283 }
1284 }
1285 }
1286
1287 tracing::debug!(
1297 target: "embedding",
1298 total_batches,
1299 completed,
1300 failed,
1301 cancelled,
1302 "embedding fan-out finished"
1303 );
1304
1305 match first_error {
1306 Some(e) => Err(e),
1307 None => Ok(()),
1308 }
1309}
1310
1311pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1312 let mut out = Vec::with_capacity(v.len() * 4);
1313 for f in v {
1314 out.extend_from_slice(&f.to_le_bytes());
1315 }
1316 out
1317}
1318
1319pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1320 let mut out = Vec::with_capacity(bytes.len() / 4);
1321 for chunk in bytes.chunks_exact(4) {
1322 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1323 }
1324 out
1325}
1326
1327pub fn embedding_dim() -> usize {
1330 crate::constants::embedding_dim()
1331}
1332
1333fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1337 let dim = crate::constants::embedding_dim();
1338 if v.len() != dim {
1339 return Err(AppError::Embedding(format!(
1340 "embedding has {} dims, expected {dim}; \
1341 refusing to truncate or pad silently (G42/C5)",
1342 v.len()
1343 )));
1344 }
1345 Ok(v)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 use super::*;
1351 use std::sync::atomic::{AtomicUsize, Ordering};
1352
1353 #[test]
1354 fn f32_to_bytes_roundtrip() {
1355 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1356 let bytes = f32_to_bytes(&input);
1357 assert_eq!(bytes.len(), input.len() * 4);
1358 let out = bytes_to_f32(&bytes);
1359 assert_eq!(out, input);
1360 }
1361
1362 #[test]
1363 fn validate_dim_rejects_divergent_vectors() {
1364 let dim = crate::constants::embedding_dim();
1367 let long = vec![0.0; dim + 10];
1368 assert!(validate_dim(long).is_err(), "longer vector must error");
1369 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1370 assert!(validate_dim(short).is_err(), "shorter vector must error");
1371 let exact = vec![0.0; dim];
1372 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1373 }
1374
1375 #[test]
1376 fn embedding_dim_matches_constants_source() {
1377 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1378 }
1379
1380 #[test]
1381 fn build_batches_preserves_global_indices() {
1382 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1383 let batches = build_batches(&texts, 4);
1384 assert_eq!(batches.len(), 3);
1385 assert_eq!(batches[0].len(), 4);
1386 assert_eq!(batches[2].len(), 2);
1387 assert_eq!(batches[2][1].0, 9);
1388 assert_eq!(batches[2][1].1, "t9");
1389 }
1390
1391 #[test]
1392 fn effective_permits_clamps_to_bounds() {
1393 assert!(effective_permits(0) >= 1);
1394 assert!(effective_permits(1000) <= 32);
1395 }
1396
1397 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1398 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1399 }
1400
1401 fn dummy_vec(dim: usize) -> Vec<f32> {
1402 vec![0.0; dim]
1403 }
1404
1405 #[test]
1408 fn concurrency_peak_never_exceeds_permits() {
1409 let permits = 4usize;
1410 let batches = test_batches(permits * 10);
1411 let dim = crate::constants::embedding_dim();
1412 let current = Arc::new(AtomicUsize::new(0));
1413 let peak = Arc::new(AtomicUsize::new(0));
1414
1415 let current_c = Arc::clone(¤t);
1416 let peak_c = Arc::clone(&peak);
1417 let work = move |batch: Vec<(usize, String)>| {
1418 let current = Arc::clone(¤t_c);
1419 let peak = Arc::clone(&peak_c);
1420 async move {
1421 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1422 peak.fetch_max(now, Ordering::SeqCst);
1423 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1424 current.fetch_sub(1, Ordering::SeqCst);
1425 Ok(batch
1426 .into_iter()
1427 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1428 .collect())
1429 }
1430 };
1431
1432 let mut delivered = 0usize;
1433 let rt = tokio::runtime::Builder::new_multi_thread()
1434 .worker_threads(4)
1435 .enable_all()
1436 .build()
1437 .expect("test runtime");
1438 rt.block_on(run_bounded(
1439 batches,
1440 permits,
1441 dim,
1442 CancellationToken::new(),
1443 work,
1444 &mut |_idx, _v| {
1445 delivered += 1;
1446 Ok(())
1447 },
1448 ))
1449 .expect("fan-out must succeed");
1450
1451 assert_eq!(delivered, permits * 10, "every item must be delivered");
1452 assert!(
1453 peak.load(Ordering::SeqCst) <= permits,
1454 "peak concurrency {} exceeded permits {permits}",
1455 peak.load(Ordering::SeqCst)
1456 );
1457 }
1458
1459 #[test]
1462 fn panicking_task_returns_permit_and_surfaces_error() {
1463 let permits = 2usize;
1464 let batches = test_batches(4);
1465 let dim = crate::constants::embedding_dim();
1466
1467 let work = move |batch: Vec<(usize, String)>| async move {
1468 if batch[0].0 == 1 {
1469 panic!("intentional test panic");
1470 }
1471 Ok(batch
1472 .into_iter()
1473 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1474 .collect())
1475 };
1476
1477 let rt = tokio::runtime::Builder::new_multi_thread()
1478 .worker_threads(2)
1479 .enable_all()
1480 .build()
1481 .expect("test runtime");
1482 let result = rt.block_on(run_bounded(
1483 batches,
1484 permits,
1485 dim,
1486 CancellationToken::new(),
1487 work,
1488 &mut |_idx, _v| Ok(()),
1489 ));
1490
1491 let err = result.expect_err("panic must surface as an error");
1492 assert!(
1493 err.to_string().contains("panicked"),
1494 "error must mention the panic: {err}"
1495 );
1496 }
1497
1498 #[test]
1501 fn cancellation_terminates_fan_out_quickly() {
1502 let permits = 2usize;
1503 let batches = test_batches(8);
1504 let dim = crate::constants::embedding_dim();
1505 let token = CancellationToken::new();
1506
1507 let work = move |batch: Vec<(usize, String)>| async move {
1508 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1510 Ok(batch
1511 .into_iter()
1512 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1513 .collect())
1514 };
1515
1516 let rt = tokio::runtime::Builder::new_multi_thread()
1517 .worker_threads(2)
1518 .enable_all()
1519 .build()
1520 .expect("test runtime");
1521 let cancel = token.clone();
1522 let start = std::time::Instant::now();
1523 let result = rt.block_on(async move {
1524 tokio::spawn(async move {
1525 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1526 cancel.cancel();
1527 });
1528 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1529 });
1530
1531 assert!(result.is_err(), "cancelled fan-out must report an error");
1532 assert!(
1533 start.elapsed() < std::time::Duration::from_secs(10),
1534 "graceful shutdown must finish well under the work duration"
1535 );
1536 }
1537
1538 #[test]
1541 fn fan_out_rejects_divergent_dim() {
1542 let permits = 2usize;
1543 let batches = test_batches(2);
1544 let dim = crate::constants::embedding_dim();
1545
1546 let work = move |batch: Vec<(usize, String)>| async move {
1547 Ok(batch
1548 .into_iter()
1549 .map(|(i, _)| (i, vec![0.0f32; 3]))
1550 .collect::<Vec<(usize, Vec<f32>)>>())
1551 };
1552
1553 let rt = tokio::runtime::Builder::new_multi_thread()
1554 .worker_threads(2)
1555 .enable_all()
1556 .build()
1557 .expect("test runtime");
1558 let result = rt.block_on(run_bounded(
1559 batches,
1560 permits,
1561 dim,
1562 CancellationToken::new(),
1563 work,
1564 &mut |_idx, _v| Ok(()),
1565 ));
1566
1567 let err = result.expect_err("divergent dim must fail the fan-out");
1568 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1569 }
1570
1571 #[test]
1573 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1574 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1575 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1576 }
1577
1578 #[test]
1580 fn adaptive_batch_dim384_shrinks() {
1581 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1582 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1583 }
1584
1585 #[test]
1587 fn adaptive_batch_intermediate_dims() {
1588 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1589 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1590 }
1591
1592 #[test]
1594 fn adaptive_batch_small_dim_clamps_to_base() {
1595 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1596 }
1597
1598 #[test]
1600 fn adaptive_batch_total_function() {
1601 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1602 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1603 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1604 }
1605
1606 #[test]
1608 #[serial_test::serial(env)]
1609 fn adaptive_wrappers_follow_env_dim() {
1610 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1611 let chunk = chunk_embed_batch_size();
1612 let entity = entity_embed_batch_size();
1613 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1614 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1615 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1616 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1617 }
1618
1619 #[test]
1627 fn embedding_error_kind_classify_oauth_message() {
1628 assert_eq!(
1629 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1630 EmbeddingErrorKind::OAuth,
1631 );
1632 assert_eq!(
1633 EmbeddingErrorKind::classify("oauth authentication failed"),
1634 EmbeddingErrorKind::OAuth,
1635 );
1636 }
1637
1638 #[test]
1641 fn embedding_error_kind_classify_quota_message() {
1642 assert_eq!(
1643 EmbeddingErrorKind::classify("quota exhausted on backend"),
1644 EmbeddingErrorKind::Quota,
1645 );
1646 assert_eq!(
1647 EmbeddingErrorKind::classify("Usage quota limit reached"),
1648 EmbeddingErrorKind::Quota,
1649 );
1650 }
1651
1652 #[test]
1656 fn embedding_error_kind_classify_slot_exhausted_message() {
1657 assert_eq!(
1658 EmbeddingErrorKind::classify(
1659 "slot exhausted: failed to acquire LLM slot after backoff"
1660 ),
1661 EmbeddingErrorKind::SlotExhausted,
1662 );
1663 }
1664
1665 #[test]
1668 fn embedding_error_kind_classify_zero_dimension_message() {
1669 assert_eq!(
1670 EmbeddingErrorKind::classify("embedding returned dim=zero"),
1671 EmbeddingErrorKind::ZeroDimension,
1672 );
1673 assert_eq!(
1674 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1675 EmbeddingErrorKind::ZeroDimension,
1676 );
1677 }
1678
1679 #[test]
1683 fn embedding_error_kind_classify_unknown_fallback() {
1684 assert_eq!(
1685 EmbeddingErrorKind::classify("unrelated subprocess error"),
1686 EmbeddingErrorKind::Unknown,
1687 );
1688 assert_eq!(
1689 EmbeddingErrorKind::classify("rate limit hit"),
1690 EmbeddingErrorKind::Unknown,
1691 );
1692 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1694 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1695 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1696 assert_eq!(
1697 EmbeddingErrorKind::BackendMismatch.code(),
1698 "backend-mismatch"
1699 );
1700 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1701 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1702 }
1703
1704 #[test]
1706 fn fallback_reason_display_does_not_panic() {
1707 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1708 let _ = FallbackReason::Cancelled.to_string();
1709 let _ = FallbackReason::Timeout {
1710 operation: "embed_query".into(),
1711 duration_secs: 30,
1712 }
1713 .to_string();
1714 }
1715
1716 #[test]
1719 fn fallback_reason_is_partial_eq() {
1720 assert_eq!(
1721 FallbackReason::EmbeddingFailed("a".into()),
1722 FallbackReason::EmbeddingFailed("a".into())
1723 );
1724 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1725 assert_ne!(
1726 FallbackReason::EmbeddingFailed("a".into()),
1727 FallbackReason::EmbeddingFailed("b".into())
1728 );
1729 assert_ne!(
1730 FallbackReason::Cancelled,
1731 FallbackReason::Timeout {
1732 operation: "x".into(),
1733 duration_secs: 1
1734 }
1735 );
1736 }
1737
1738 #[test]
1741 fn fallback_reason_timeout_preserves_fields() {
1742 let r = FallbackReason::Timeout {
1743 operation: "embed_query_local".into(),
1744 duration_secs: 300,
1745 };
1746 match r {
1747 FallbackReason::Timeout {
1748 operation,
1749 duration_secs,
1750 } => {
1751 assert_eq!(operation, "embed_query_local");
1752 assert_eq!(duration_secs, 300);
1753 }
1754 other => panic!("expected Timeout, got {other:?}"),
1755 }
1756 }
1757
1758 #[test]
1764 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1765 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1766 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1769 let result = try_embed_query_with_fallback(bogus, "hello world");
1770 match result {
1771 Err(FallbackReason::EmbeddingFailed(msg)) => {
1772 assert!(!msg.is_empty(), "fallback message must not be empty");
1774 }
1775 Err(FallbackReason::Cancelled) => {
1776 panic!("expected EmbeddingFailed, got Cancelled");
1777 }
1778 Err(FallbackReason::Timeout { .. }) => {
1779 panic!("expected EmbeddingFailed, got Timeout");
1780 }
1781 Err(FallbackReason::SlotExhausted) => {
1782 panic!("expected EmbeddingFailed, got SlotExhausted");
1783 }
1784 Err(FallbackReason::OAuthQuota { .. }) => {
1785 panic!("expected EmbeddingFailed, got OAuthQuota");
1786 }
1787 Err(FallbackReason::BackendMismatch { .. }) => {
1788 panic!("expected EmbeddingFailed, got BackendMismatch");
1789 }
1790 Err(FallbackReason::DimZero) => {
1791 panic!("expected EmbeddingFailed, got DimZero");
1792 }
1793 Ok(_) => {
1794 panic!("expected an error, got Ok — embedder must fail for bogus path");
1795 }
1796 }
1797 }
1798
1799 #[test]
1801 fn g56_entity_cache_key_is_stable_and_distinct() {
1802 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
1803 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
1804 let k3 = entity_cache_key("codex:default", "claude-code");
1805 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
1806 assert_eq!(k1, k2, "same model+text must hash identically");
1807 assert_ne!(k1, k3, "different text must hash differently");
1808 assert_ne!(k1, k4, "different model must hash differently");
1809 }
1810
1811 #[test]
1812 fn g56_entity_embed_cache_stats_hit_rate() {
1813 let zero = EmbedCacheStats::default();
1814 assert_eq!(zero.hit_rate(), 0.0);
1815 let half = EmbedCacheStats {
1816 requested: 4,
1817 hits: 2,
1818 misses: 2,
1819 };
1820 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
1821 let all = EmbedCacheStats {
1822 requested: 7,
1823 hits: 7,
1824 misses: 0,
1825 };
1826 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
1827 }
1828
1829 #[test]
1830 fn g56_entity_embed_cache_populates_and_hits() {
1831 let cache = entity_embed_cache();
1835 let model = "test-model";
1836 let text = "sqlite-graphrag";
1837 let key = entity_cache_key(model, text);
1838 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
1839 cache.lock().insert(key, Arc::clone(&stored));
1840 let guard = cache.lock();
1841 let hit = guard.get(&key).expect("cache must return stored value");
1842 assert_eq!(hit.len(), crate::constants::embedding_dim());
1843 assert!((hit[0] - 0.42).abs() < 1e-6);
1844 }
1845
1846 #[test]
1847 fn g56_empty_texts_short_circuits_with_zero_stats() {
1848 let stats = EmbedCacheStats::default();
1851 assert_eq!(stats.requested, 0);
1852 assert_eq!(stats.hits, 0);
1853 assert_eq!(stats.misses, 0);
1854 assert_eq!(stats.hit_rate(), 0.0);
1855 }
1856}
1857
1858#[cfg(test)]
1862mod embed_with_fallback_tests {
1863 use super::*;
1864 use crate::llm::exit_code_hints::LlmBackendError;
1865
1866 #[test]
1867 fn none_backend_returns_empty_vector_without_calling_llm() {
1868 let (v, kind) = embed_via_backend(
1872 std::path::Path::new("/nonexistent"),
1873 "any text",
1874 &LlmBackendKind::None,
1875 )
1876 .expect("None backend never fails");
1877 assert!(v.is_empty());
1878 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
1879 }
1880
1881 #[test]
1882 fn empty_chain_defaults_to_codex_claude_none() {
1883 let defaults = [
1887 LlmBackendKind::Codex,
1888 LlmBackendKind::Claude,
1889 LlmBackendKind::None,
1890 ];
1891
1892 #[allow(dead_code)]
1897 fn llm_backend_kind_as_str_is_stable() {
1898 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
1899 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
1900 assert_eq!(LlmBackendKind::None.as_str(), "none");
1901 }
1902
1903 #[allow(dead_code)]
1904 fn fallback_reason_reason_code_is_stable() {
1905 assert_eq!(
1906 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
1907 "embedding_failed"
1908 );
1909 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
1910 assert_eq!(
1911 FallbackReason::Timeout {
1912 operation: "embed_query".into(),
1913 duration_secs: 30
1914 }
1915 .reason_code(),
1916 "timeout"
1917 );
1918 }
1919 assert_eq!(defaults.len(), 3);
1920 }
1921
1922 #[test]
1923 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
1924 let chain = vec![LlmBackendKind::None];
1936 let err = embed_with_fallback(
1937 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1938 "hello",
1939 &chain,
1940 false,
1941 )
1942 .expect_err("chain of only [None] without skip_on_failure MUST abort");
1943 let msg = format!("{err}");
1944 assert!(
1945 msg.contains("no LLM backends available"),
1946 "error must mention exhausted chain, got: {msg}"
1947 );
1948 }
1949 #[test]
1950 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
1951 let chain = vec![LlmBackendKind::None];
1956 let v = embed_with_fallback(
1957 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
1958 "hello",
1959 &chain,
1960 true,
1961 )
1962 .expect("None chain is always Ok");
1963 assert!(v.0.is_empty(), "vector must be empty");
1964 assert_eq!(v.1, LlmBackendKind::None);
1965 }
1966 #[allow(dead_code)]
1967 fn llm_backend_error_no_backends_default_message() {
1968 let e = LlmBackendError::NoBackendsAvailable;
1971 let h = e.hint();
1972 assert!(h.contains("--llm-fallback"));
1973 }
1974
1975 #[test]
1976 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
1977 let e = LlmBackendError::NonZeroExit {
1978 exit_code: Some(137),
1979 signal: Some(9),
1980 stdout_tail: "out".into(),
1981 stderr_tail: "OOM killed".into(),
1982 binary: "codex".into(),
1983 hint: "OOM".into(),
1984 };
1985 let s = e.to_string();
1986 assert!(s.contains("codex"));
1987 assert!(s.contains("OOM killed"));
1988 assert!(s.contains("signal 9") || s.contains("exit 137"));
1989 }
1990}