1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62pub fn is_openrouter_initialized() -> bool {
64 OPENROUTER_CLIENT.get().is_some()
65}
66static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
67
68static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
74
75pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
79
80pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
84
85pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
87
88fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
96 let base = base.max(1);
97 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
98}
99
100pub fn chunk_embed_batch_size() -> usize {
102 let dim = crate::constants::embedding_dim();
103 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
104 tracing::debug!(
105 dim,
106 base = CHUNK_EMBED_BATCH_SIZE,
107 batch,
108 "adaptive chunk batch size (G44)"
109 );
110 batch
111}
112
113pub fn entity_embed_batch_size() -> usize {
115 let dim = crate::constants::embedding_dim();
116 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
117 tracing::debug!(
118 dim,
119 base = ENTITY_EMBED_BATCH_SIZE,
120 batch,
121 "adaptive entity batch size (G44)"
122 );
123 batch
124}
125
126pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
128 if let Some(rt) = RUNTIME.get() {
129 return Ok(rt);
130 }
131 let rt = tokio::runtime::Builder::new_multi_thread()
132 .worker_threads(2)
133 .enable_all()
134 .build()
135 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
136 let _ = RUNTIME.set(rt);
137 Ok(RUNTIME.get().expect("RUNTIME initialised above"))
138}
139
140pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
142 if let Some(e) = EMBEDDER.get() {
143 return Ok(e);
144 }
145 let backend = LlmEmbedding::detect_available()?;
146 let _ = EMBEDDER.set(Mutex::new(backend));
147 Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
148}
149
150pub fn get_claude_embedder(
155 claude_binary: Option<&Path>,
156 claude_model: Option<&str>,
157) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
158 if let Some(e) = CLAUDE_EMBEDDER.get() {
159 return Ok(e);
160 }
161 let mut builder = LlmEmbedding::with_claude_builder();
162 if let Some(b) = claude_binary {
163 builder = builder.override_binary(b.to_path_buf());
164 }
165 if let Some(m) = claude_model {
166 builder = builder.override_model(m.to_string());
167 }
168 let backend = builder.build()?;
169 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
170 Ok(CLAUDE_EMBEDDER
171 .get()
172 .expect("CLAUDE_EMBEDDER initialised above"))
173}
174
175pub fn get_opencode_embedder(
180 opencode_binary: Option<&Path>,
181 opencode_model: Option<&str>,
182) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
183 if let Some(e) = OPENCODE_EMBEDDER.get() {
184 return Ok(e);
185 }
186 let mut builder = LlmEmbedding::with_opencode_builder();
187 if let Some(b) = opencode_binary {
188 builder = builder.override_binary(b.to_path_buf());
189 }
190 if let Some(m) = opencode_model {
191 builder = builder.override_model(m.to_string());
192 }
193 let backend = builder.build()?;
194 let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
195 Ok(OPENCODE_EMBEDDER
196 .get()
197 .expect("OPENCODE_EMBEDDER initialised above"))
198}
199
200pub fn get_openrouter_embedder(
201 api_key: secrecy::SecretBox<String>,
202 model: &str,
203 dim: usize,
204) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
205 if let Some(c) = OPENROUTER_CLIENT.get() {
206 return Ok(c);
207 }
208 let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
209 let _ = OPENROUTER_CLIENT.set(client);
210 Ok(OPENROUTER_CLIENT
211 .get()
212 .expect("OPENROUTER_CLIENT initialised above"))
213}
214
215pub fn embed_via_claude_local(
219 _models_dir: &Path,
220 text: &str,
221 claude_binary: Option<&Path>,
222 claude_model: Option<&str>,
223) -> Result<Vec<f32>, AppError> {
224 let _slot_guard = acquire_llm_slot_for_embedding()?;
225 let embedder = get_claude_embedder(claude_binary, claude_model)?;
226 embed_passage(embedder, text)
227}
228
229pub fn embed_via_claude_local_resolved(
234 _models_dir: &Path,
235 text: &str,
236 claude_binary: Option<&Path>,
237 claude_model: Option<&str>,
238) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
239 let _slot_guard = acquire_llm_slot_for_embedding()?;
240 let embedder = get_claude_embedder(claude_binary, claude_model)?;
241 let v = embed_passage(embedder, text)?;
242 Ok((v, LlmBackendKind::Claude))
243}
244
245pub fn embed_via_opencode_local_resolved(
250 _models_dir: &Path,
251 text: &str,
252 opencode_binary: Option<&Path>,
253 opencode_model: Option<&str>,
254) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
255 let _slot_guard = acquire_llm_slot_for_embedding()?;
256 let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
257 let v = embed_passage(embedder, text)?;
258 Ok((v, LlmBackendKind::Opencode))
259}
260fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
263 embedder.lock().clone()
264}
265
266pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
270 let client = clone_client(embedder);
271 let result = client.embed_passage(text)?;
272 validate_dim(result)
273}
274
275pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
279 let client = clone_client(embedder);
280 let result = client.embed_query(text)?;
281 validate_dim(result)
282}
283
284pub fn embed_passages_controlled(
289 embedder: &Mutex<LlmEmbedding>,
290 texts: &[&str],
291 _token_counts: &[usize],
292) -> Result<Vec<Vec<f32>>, AppError> {
293 if texts.is_empty() {
294 return Ok(Vec::new());
295 }
296 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
297 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
298}
299
300pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
301 let _slot_guard = acquire_llm_slot_for_embedding()?;
302 let embedder = get_embedder(models_dir)?;
303 embed_passage(embedder, text)
304}
305
306pub fn should_skip_embedding_on_failure() -> bool {
310 matches!(
311 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
312 Ok("1") | Ok("true")
313 )
314}
315
316pub fn embed_passage_or_skip(
323 models_dir: &Path,
324 text: &str,
325 choice: Option<crate::cli::LlmBackendChoice>,
326) -> Result<Option<Vec<f32>>, AppError> {
327 match embed_passage_with_choice(models_dir, text, choice) {
328 Ok((v, _backend)) => Ok(Some(v)),
329 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
330 Err(e) => {
331 if should_skip_embedding_on_failure() {
332 tracing::warn!(
333 error = %e,
334 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
335 );
336 Ok(None)
337 } else {
338 Err(e)
339 }
340 }
341 }
342}
343
344pub fn embed_passage_local_resolved(
350 models_dir: &Path,
351 text: &str,
352) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
353 let _slot_guard = acquire_llm_slot_for_embedding()?;
354 let embedder = get_embedder(models_dir)?;
355 let v = embed_passage(embedder, text)?;
356 let kind = match embedder.lock().flavour() {
357 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
358 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
359 crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
360 };
361 Ok((v, kind))
362}
363
364pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
365 let _slot_guard = acquire_llm_slot_for_embedding()?;
366 let embedder = get_embedder(models_dir)?;
367 embed_query(embedder, text)
368}
369
370pub fn embed_passage_with_choice(
387 models_dir: &Path,
388 text: &str,
389 choice: Option<crate::cli::LlmBackendChoice>,
390) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
391 let _slot_guard = acquire_llm_slot_for_embedding()?;
392 match choice {
393 None => {
394 let embedder = get_embedder(models_dir)?;
395 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
396 }
397 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
398 }
399}
400
401pub fn embed_passage_with_embedding_choice(
405 models_dir: &Path,
406 text: &str,
407 embedding_backend: crate::cli::EmbeddingBackendChoice,
408 llm_backend: crate::cli::LlmBackendChoice,
409) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
410 let _slot_guard = acquire_llm_slot_for_embedding()?;
411 let chain = embedding_backend.to_chain(llm_backend);
412 embed_with_fallback(models_dir, text, &chain, false)
413}
414
415pub fn try_embed_query_with_choice(
421 models_dir: &Path,
422 text: &str,
423 choice: Option<crate::cli::LlmBackendChoice>,
424) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
425 match embed_passage_with_choice(models_dir, text, choice) {
426 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
439 Ok((v, backend)) => Ok((v, backend)),
440 Err(e) => Err(classify_embedding_error(e)),
441 }
442}
443pub fn try_embed_query_with_embedding_choice(
448 models_dir: &Path,
449 text: &str,
450 embedding_backend: crate::cli::EmbeddingBackendChoice,
451 llm_backend: crate::cli::LlmBackendChoice,
452) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
453 match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
454 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
455 Ok((v, backend)) => Ok((v, backend)),
456 Err(e) => Err(classify_embedding_error(e)),
457 }
458}
459
460fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
472 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
473 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
474 .ok()
475 .and_then(|s| s.parse::<u32>().ok())
476 .filter(|n| *n >= 1)
477 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
478 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
479 0
480 } else {
481 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
482 .ok()
483 .and_then(|s| s.parse::<u64>().ok())
484 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
485 };
486 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
494 Ok(guard) => Ok(guard),
495 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
496 "slot exhausted: {e} (fall back to FTS5)"
497 ))),
498 Err(e) => Err(e),
499 }
500}
501#[derive(Debug, Clone, Copy, PartialEq, Eq)]
513pub enum EmbeddingErrorKind {
514 OAuth,
516 Quota,
518 SlotExhausted,
520 BackendMismatch,
522 ZeroDimension,
524 Unknown,
526}
527
528impl EmbeddingErrorKind {
529 pub fn classify(msg: &str) -> Self {
538 let m = msg.to_lowercase();
539 if m.contains("oauth") {
540 Self::OAuth
541 } else if m.contains("quota") {
542 Self::Quota
543 } else if m.contains("slot exhausted") {
544 Self::SlotExhausted
545 } else if m.contains("backend mismatch") {
546 Self::BackendMismatch
547 } else if m.contains("dim") && m.contains("zero") {
548 Self::ZeroDimension
549 } else {
550 Self::Unknown
551 }
552 }
553
554 pub fn code(&self) -> &'static str {
556 match self {
557 Self::OAuth => "oauth",
558 Self::Quota => "quota",
559 Self::SlotExhausted => "slot-exhausted",
560 Self::BackendMismatch => "backend-mismatch",
561 Self::ZeroDimension => "zero-dimension",
562 Self::Unknown => "unknown",
563 }
564 }
565}
566
567impl std::fmt::Display for EmbeddingErrorKind {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 f.write_str(self.code())
570 }
571}
572
573#[derive(Debug, Clone, PartialEq)]
580pub enum FallbackReason {
581 EmbeddingFailed(String),
585 SlotExhausted,
590 OAuthQuota { backend: &'static str },
594 BackendMismatch {
598 requested: &'static str,
599 resolved: &'static str,
600 },
601 DimZero,
606 Cancelled,
608 Timeout {
611 operation: String,
612 duration_secs: u64,
613 },
614}
615
616impl FallbackReason {
617 pub fn reason_code(&self) -> &'static str {
621 match self {
622 Self::EmbeddingFailed(_) => "embedding_failed",
623 Self::SlotExhausted => "slot_exhausted",
624 Self::OAuthQuota { .. } => "oauth_quota",
625 Self::BackendMismatch { .. } => "backend_mismatch",
626 Self::DimZero => "dim_zero",
627 Self::Cancelled => "cancelled",
628 Self::Timeout { .. } => "timeout",
629 }
630 }
631}
632
633impl std::fmt::Display for FallbackReason {
634 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635 match self {
636 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
637 Self::SlotExhausted => write!(
638 f,
639 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
640 ),
641 Self::OAuthQuota { backend } => {
642 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
643 }
644 Self::BackendMismatch {
645 requested,
646 resolved,
647 } => {
648 write!(
649 f,
650 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
651 )
652 }
653 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
654 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
655 Self::Timeout {
656 operation,
657 duration_secs,
658 } => {
659 write!(
660 f,
661 "embedding timed out after {duration_secs}s during {operation}"
662 )
663 }
664 }
665 }
666}
667
668impl std::error::Error for FallbackReason {}
669
670pub fn try_embed_query_with_fallback(
678 models_dir: &Path,
679 query: &str,
680) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
681 match embed_query_local(models_dir, query) {
682 Ok(v) => Ok((v, LlmBackendKind::None)),
683 Err(e) => Err(classify_embedding_error(e)),
684 }
685}
686
687pub fn try_embed_query_with_deterministic_fallback(
696 models_dir: &Path,
697 query: &str,
698 choice: Option<crate::cli::LlmBackendChoice>,
699) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
700 match try_embed_query_with_choice(models_dir, query, choice) {
701 Ok(t) => Ok(t),
702 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
703 let alt = match backend {
704 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
705 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
706 "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
707 "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
708 _ => None,
709 };
710 if let Some(alt_choice) = alt {
711 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
712 } else {
713 Err(reason)
714 }
715 }
716 Err(reason @ FallbackReason::SlotExhausted) => {
717 std::thread::sleep(std::time::Duration::from_millis(750));
718 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
719 }
720 Err(other) => Err(other),
721 }
722}
723
724pub fn classify_embedding_error(err: AppError) -> FallbackReason {
732 match err {
733 AppError::Timeout {
734 operation,
735 duration_secs,
736 } => FallbackReason::Timeout {
737 operation,
738 duration_secs,
739 },
740 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
741 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
750 EmbeddingErrorKind::OAuth => {
751 let backend = if msg.contains("codex") {
752 "codex"
753 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
754 "claude"
759 } else if msg.contains("opencode") {
760 "opencode"
761 } else {
762 "unknown"
763 };
764 FallbackReason::OAuthQuota { backend }
765 }
766 EmbeddingErrorKind::Quota => {
767 let backend = if msg.contains("codex") {
768 "codex"
769 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
770 "claude"
771 } else if msg.contains("opencode") {
772 "opencode"
773 } else {
774 "unknown"
775 };
776 FallbackReason::OAuthQuota { backend }
777 }
778 EmbeddingErrorKind::BackendMismatch => {
779 let (requested, resolved) =
784 if msg.contains("requested claude") && msg.contains("but codex") {
785 ("claude", "codex")
786 } else if msg.contains("requested codex") && msg.contains("but claude") {
787 ("codex", "claude")
788 } else if msg.contains("requested claude") {
789 ("claude", "unknown")
790 } else if msg.contains("requested codex") {
791 ("codex", "unknown")
792 } else {
793 ("unknown", "unknown")
794 };
795 FallbackReason::BackendMismatch {
796 requested,
797 resolved,
798 }
799 }
800 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
801 EmbeddingErrorKind::Unknown => {
802 if msg.contains("cancelled") {
803 FallbackReason::Cancelled
804 } else {
805 FallbackReason::EmbeddingFailed(msg)
806 }
807 }
808 },
809 e => FallbackReason::EmbeddingFailed(e.to_string()),
810 }
811}
812pub fn embed_with_fallback(
831 models_dir: &Path,
832 text: &str,
833 chain: &[LlmBackendKind],
834 skip_on_failure: bool,
835) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
836 use crate::llm::exit_code_hints::LlmBackendError;
837 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
838 vec![
839 LlmBackendKind::Codex,
840 LlmBackendKind::Claude,
841 LlmBackendKind::Opencode,
842 LlmBackendKind::None,
843 ]
844 } else {
845 chain.to_vec()
846 };
847
848 let mut last_err: Option<AppError> = None;
849 for backend in &effective {
850 match embed_via_backend_strict(
861 models_dir,
862 text,
863 backend,
864 last_err.as_ref(),
865 skip_on_failure,
866 ) {
867 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
868 Err(e) => {
869 if matches!(e, AppError::Validation(_)) {
874 return Err(e);
875 }
876 tracing::warn!(
877 target: "embedding",
878 backend = ?backend,
879 error = %e,
880 "embed_with_fallback: backend failed, trying next"
881 );
882 last_err = Some(e);
883 }
884 }
885 }
886 if skip_on_failure {
887 return Ok((Vec::new(), LlmBackendKind::None));
892 }
893 Err(last_err
894 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
895}
896
897#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
901pub enum LlmBackendKind {
902 Codex,
904 Claude,
906 Opencode,
908 OpenRouter,
910 None,
912}
913
914impl LlmBackendKind {
915 pub fn as_str(self) -> &'static str {
918 match self {
919 Self::Codex => "codex",
920 Self::Claude => "claude",
921 Self::Opencode => "opencode",
922 Self::OpenRouter => "openrouter",
923 Self::None => "none",
924 }
925 }
926}
927
928pub fn embed_via_backend(
943 models_dir: &Path,
944 text: &str,
945 backend: &LlmBackendKind,
946) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
947 match backend {
948 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
949 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
950 LlmBackendKind::Claude => {
951 tracing::debug!(
955 target: "embedder",
956 backend = "claude",
957 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
958 );
959 embed_via_claude_local_resolved(models_dir, text, None, None)
960 }
961 LlmBackendKind::Opencode => {
962 tracing::debug!(
963 target: "embedder",
964 backend = "opencode",
965 "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
966 );
967 embed_via_opencode_local_resolved(models_dir, text, None, None)
968 }
969 LlmBackendKind::OpenRouter => {
970 tracing::debug!(
971 target: "embedder",
972 backend = "openrouter",
973 "embed_via_backend: using OpenRouter API (v1.0.93)"
974 );
975 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
976 AppError::Embedding(
977 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
978 )
979 })?;
980 let rt = shared_runtime()?;
981 let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
982 Ok((vec, LlmBackendKind::OpenRouter))
983 }
984 }
985}
986
987pub fn embed_via_backend_strict(
1000 models_dir: &Path,
1001 text: &str,
1002 backend: &LlmBackendKind,
1003 last_err: Option<&AppError>,
1004 skip_on_failure: bool,
1005) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1006 use crate::llm::exit_code_hints::LlmBackendError;
1007 match backend {
1008 LlmBackendKind::None => {
1009 if skip_on_failure && last_err.is_none() {
1013 Ok((Vec::new(), LlmBackendKind::None))
1014 } else if last_err.is_some() {
1015 Err(match last_err {
1019 Some(e) => AppError::Embedding(format!("{e}")),
1020 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1021 })
1022 } else {
1023 Err(AppError::Embedding(
1026 LlmBackendError::NoBackendsAvailable.to_string(),
1027 ))
1028 }
1029 }
1030 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1031 LlmBackendKind::Claude => {
1032 tracing::debug!(
1033 target: "embedder",
1034 backend = "claude",
1035 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1036 );
1037 embed_via_claude_local_resolved(models_dir, text, None, None)
1038 }
1039 LlmBackendKind::Opencode => {
1040 tracing::debug!(
1041 target: "embedder",
1042 backend = "opencode",
1043 "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1044 );
1045 embed_via_opencode_local_resolved(models_dir, text, None, None)
1046 }
1047 LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1048 }
1049}
1050
1051pub fn embed_via_backend_legacy(
1056 models_dir: &Path,
1057 text: &str,
1058 backend: &LlmBackendKind,
1059) -> Result<Vec<f32>, AppError> {
1060 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1061}
1062
1063pub fn embed_passages_controlled_local(
1064 models_dir: &Path,
1065 texts: &[&str],
1066 token_counts: &[usize],
1067) -> Result<Vec<Vec<f32>>, AppError> {
1068 let embedder = get_embedder(models_dir)?;
1069 embed_passages_controlled(embedder, texts, token_counts)
1070}
1071
1072pub fn embed_passages_parallel_local(
1075 models_dir: &Path,
1076 texts: &[String],
1077 parallelism: usize,
1078 batch_size: usize,
1079) -> Result<Vec<Vec<f32>>, AppError> {
1080 let embedder = get_embedder(models_dir)?;
1081 embed_texts_parallel(embedder, texts, parallelism, batch_size)
1082}
1083
1084pub fn embed_passages_parallel_with_embedding_choice(
1091 models_dir: &Path,
1092 texts: &[String],
1093 parallelism: usize,
1094 batch_size: usize,
1095 embedding_backend: crate::cli::EmbeddingBackendChoice,
1096 llm_backend: crate::cli::LlmBackendChoice,
1097) -> Result<Vec<Vec<f32>>, AppError> {
1098 let chain = embedding_backend.to_chain(llm_backend);
1099 if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1100 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1101 AppError::Embedding(
1102 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1103 )
1104 })?;
1105 let rt = shared_runtime()?;
1106 let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1107 let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1108 Ok(vecs)
1109 } else {
1110 embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1111 }
1112}
1113
1114type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1126
1127static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1128
1129fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1130 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1131}
1132
1133fn entity_cache_key(model: &str, text: &str) -> u64 {
1134 let mut hasher = blake3::Hasher::new();
1135 hasher.update(model.as_bytes());
1136 hasher.update(b"\0");
1137 hasher.update(text.as_bytes());
1138 let h = hasher.finalize();
1139 let bytes = h.as_bytes();
1140 u64::from_le_bytes([
1141 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1142 ])
1143}
1144
1145pub fn embed_entity_texts_cached(
1155 models_dir: &Path,
1156 texts: &[String],
1157 parallelism: usize,
1158 embedding_backend: crate::cli::EmbeddingBackendChoice,
1159 llm_backend: crate::cli::LlmBackendChoice,
1160) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1161 if texts.is_empty() {
1162 return Ok((Vec::new(), EmbedCacheStats::default()));
1163 }
1164 let chain = embedding_backend.to_chain(llm_backend);
1168
1169 if chain.as_slice() == [LlmBackendKind::None] {
1175 let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1176 return Ok((
1177 out,
1178 EmbedCacheStats {
1179 requested: texts.len(),
1180 hits: 0,
1181 misses: texts.len(),
1182 },
1183 ));
1184 }
1185
1186 let routed_openrouter =
1192 chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1193 let model = if routed_openrouter {
1194 format!("openrouter:{}", crate::constants::embedding_dim())
1195 } else {
1196 get_embedder(models_dir)?.lock().model_label()
1197 };
1198 let cache = entity_embed_cache();
1199 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1200 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1201 {
1202 let guard = cache.lock();
1203 for (i, text) in texts.iter().enumerate() {
1204 let key = entity_cache_key(&model, text);
1205 if let Some(v) = guard.get(&key) {
1206 hits[i] = Some(Arc::clone(v));
1207 } else {
1208 miss_indices.push(i);
1209 }
1210 }
1211 }
1212 let miss_count = miss_indices.len();
1213 if miss_count > 0 {
1214 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1215 let miss_vecs = embed_passages_parallel_with_embedding_choice(
1219 models_dir,
1220 &miss_texts,
1221 parallelism,
1222 entity_embed_batch_size(),
1223 embedding_backend,
1224 llm_backend,
1225 )?;
1226 let mut guard = cache.lock();
1227 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1228 let vec = Arc::new(miss_vecs[slot].clone());
1229 let key = entity_cache_key(&model, &texts[orig_idx]);
1230 guard.insert(key, Arc::clone(&vec));
1231 hits[orig_idx] = Some(vec);
1232 }
1233 }
1234 let mut out = Vec::with_capacity(texts.len());
1235 for hit in hits.into_iter() {
1236 let v = hit.ok_or_else(|| {
1237 AppError::Embedding("entity embed cache produced null result".to_string())
1238 })?;
1239 out.push((*v).clone());
1240 }
1241 Ok((
1242 out,
1243 EmbedCacheStats {
1244 requested: texts.len(),
1245 hits: texts.len() - miss_count,
1246 misses: miss_count,
1247 },
1248 ))
1249}
1250
1251#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1253pub struct EmbedCacheStats {
1254 pub requested: usize,
1255 pub hits: usize,
1256 pub misses: usize,
1257}
1258
1259impl EmbedCacheStats {
1260 pub fn hit_rate(&self) -> f64 {
1262 if self.requested == 0 {
1263 0.0
1264 } else {
1265 self.hits as f64 / self.requested as f64
1266 }
1267 }
1268}
1269
1270pub fn embed_texts_parallel(
1283 embedder: &Mutex<LlmEmbedding>,
1284 texts: &[String],
1285 parallelism: usize,
1286 batch_size: usize,
1287) -> Result<Vec<Vec<f32>>, AppError> {
1288 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1289 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1290 slots[idx] = Some(v.to_vec());
1291 Ok(())
1292 })?;
1293 let mut out = Vec::with_capacity(slots.len());
1294 for (idx, slot) in slots.into_iter().enumerate() {
1295 out.push(slot.ok_or_else(|| {
1296 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1297 })?);
1298 }
1299 Ok(out)
1300}
1301
1302pub fn embed_texts_parallel_with(
1306 embedder: &Mutex<LlmEmbedding>,
1307 texts: &[String],
1308 parallelism: usize,
1309 batch_size: usize,
1310 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1311) -> Result<(), AppError> {
1312 if texts.is_empty() {
1313 return Ok(());
1314 }
1315 let dim = crate::constants::embedding_dim();
1316 if texts.len() == 1 {
1317 let v = embed_passage(embedder, &texts[0])?;
1318 return on_result(0, &v);
1319 }
1320
1321 let client = clone_client(embedder);
1322 let permits = effective_permits(parallelism);
1323 let batches = build_batches(texts, batch_size.max(1));
1324 let token = crate::cancel_token().clone();
1325
1326 let work = move |batch: Vec<(usize, String)>| {
1327 let client = client.clone();
1328 async move {
1329 client
1330 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1331 .await
1332 }
1333 };
1334
1335 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1336 match tokio::runtime::Handle::try_current() {
1337 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1338 Err(_) => shared_runtime()?.block_on(fan_out),
1339 }
1340}
1341
1342fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1344 texts
1345 .iter()
1346 .cloned()
1347 .enumerate()
1348 .collect::<Vec<_>>()
1349 .chunks(batch_size)
1350 .map(|c| c.to_vec())
1351 .collect()
1352}
1353
1354pub fn effective_permits(requested: usize) -> usize {
1359 let cpus = std::thread::available_parallelism()
1360 .map(|n| n.get())
1361 .unwrap_or(4);
1362 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1363 / crate::constants::LLM_WORKER_RSS_MB)
1364 .max(1) as usize;
1365 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1366}
1367
1368async fn run_bounded<F, Fut>(
1378 batches: Vec<Vec<(usize, String)>>,
1379 permits: usize,
1380 dim: usize,
1381 token: CancellationToken,
1382 work: F,
1383 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1384) -> Result<(), AppError>
1385where
1386 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1387 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1388{
1389 let total_batches = batches.len();
1390 let semaphore = Arc::new(Semaphore::new(permits));
1391 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1394 let mut set: JoinSet<()> = JoinSet::new();
1395
1396 for (batch_idx, batch) in batches.into_iter().enumerate() {
1397 let sem = Arc::clone(&semaphore);
1398 let token = token.clone();
1399 let tx = tx.clone();
1400 let work = work.clone();
1401 set.spawn(async move {
1402 let wait_start = std::time::Instant::now();
1403 let Ok(_permit) = sem.acquire_owned().await else {
1406 let _ = tx
1407 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1408 .await;
1409 return;
1410 };
1411 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1412 let work_start = std::time::Instant::now();
1413 let outcome = if crate::should_obey_shutdown() {
1419 tokio::select! {
1420 res = work(batch) => res,
1421 _ = token.cancelled() => Err(AppError::Embedding(
1422 "embedding cancelled by shutdown signal".to_string(),
1423 )),
1424 }
1425 } else {
1426 work(batch).await
1427 };
1428 tracing::debug!(
1430 target: "embedding",
1431 batch_idx,
1432 permit_wait_ms,
1433 work_ms = work_start.elapsed().as_millis() as u64,
1434 ok = outcome.is_ok(),
1435 "embedding batch finished"
1436 );
1437 let _ = tx.send(outcome).await;
1438 });
1439 }
1440 drop(tx);
1441
1442 let mut completed = 0usize;
1443 let mut failed = 0usize;
1444 let mut cancelled = 0usize;
1445 let mut first_error: Option<AppError> = None;
1446
1447 while let Some(message) = rx.recv().await {
1448 match message {
1449 Ok(items) => {
1450 completed += 1;
1451 if first_error.is_none() {
1452 for (idx, v) in items {
1453 if v.len() != dim {
1454 first_error = Some(AppError::Embedding(format!(
1455 "LLM returned {} dims for item {idx}, expected {dim}; \
1456 refusing to truncate or pad silently (G42/C5)",
1457 v.len()
1458 )));
1459 break;
1460 }
1461 if let Err(e) = on_result(idx, &v) {
1462 first_error = Some(e);
1463 break;
1464 }
1465 }
1466 if first_error.is_some() {
1467 set.shutdown().await;
1470 }
1471 }
1472 }
1473 Err(e) => {
1474 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1475 cancelled += 1;
1476 } else {
1477 failed += 1;
1478 }
1479 if first_error.is_none() {
1480 first_error = Some(e);
1481 set.shutdown().await;
1482 }
1483 }
1484 }
1485 }
1486
1487 while let Some(join_result) = set.join_next().await {
1490 if let Err(join_err) = join_result {
1491 if join_err.is_panic() {
1492 failed += 1;
1493 if first_error.is_none() {
1494 first_error = Some(AppError::Embedding(format!(
1495 "embedding task panicked: {join_err}"
1496 )));
1497 }
1498 } else {
1499 cancelled += 1;
1500 }
1501 }
1502 }
1503
1504 tracing::debug!(
1514 target: "embedding",
1515 total_batches,
1516 completed,
1517 failed,
1518 cancelled,
1519 "embedding fan-out finished"
1520 );
1521
1522 match first_error {
1523 Some(e) => Err(e),
1524 None => Ok(()),
1525 }
1526}
1527
1528pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1529 let mut out = Vec::with_capacity(v.len() * 4);
1530 for f in v {
1531 out.extend_from_slice(&f.to_le_bytes());
1532 }
1533 out
1534}
1535
1536pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1537 let mut out = Vec::with_capacity(bytes.len() / 4);
1538 for chunk in bytes.chunks_exact(4) {
1539 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1540 }
1541 out
1542}
1543
1544pub fn embedding_dim() -> usize {
1547 crate::constants::embedding_dim()
1548}
1549
1550fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1554 let dim = crate::constants::embedding_dim();
1555 if v.len() != dim {
1556 return Err(AppError::Embedding(format!(
1557 "embedding has {} dims, expected {dim}; \
1558 refusing to truncate or pad silently (G42/C5)",
1559 v.len()
1560 )));
1561 }
1562 Ok(v)
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567 use super::*;
1568 use std::sync::atomic::{AtomicUsize, Ordering};
1569
1570 #[test]
1571 fn f32_to_bytes_roundtrip() {
1572 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1573 let bytes = f32_to_bytes(&input);
1574 assert_eq!(bytes.len(), input.len() * 4);
1575 let out = bytes_to_f32(&bytes);
1576 assert_eq!(out, input);
1577 }
1578
1579 #[test]
1580 fn validate_dim_rejects_divergent_vectors() {
1581 let dim = crate::constants::embedding_dim();
1584 let long = vec![0.0; dim + 10];
1585 assert!(validate_dim(long).is_err(), "longer vector must error");
1586 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1587 assert!(validate_dim(short).is_err(), "shorter vector must error");
1588 let exact = vec![0.0; dim];
1589 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1590 }
1591
1592 #[test]
1593 fn embedding_dim_matches_constants_source() {
1594 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1595 }
1596
1597 #[test]
1598 fn build_batches_preserves_global_indices() {
1599 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1600 let batches = build_batches(&texts, 4);
1601 assert_eq!(batches.len(), 3);
1602 assert_eq!(batches[0].len(), 4);
1603 assert_eq!(batches[2].len(), 2);
1604 assert_eq!(batches[2][1].0, 9);
1605 assert_eq!(batches[2][1].1, "t9");
1606 }
1607
1608 #[test]
1609 fn effective_permits_clamps_to_bounds() {
1610 assert!(effective_permits(0) >= 1);
1611 assert!(effective_permits(1000) <= 32);
1612 }
1613
1614 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1615 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1616 }
1617
1618 fn dummy_vec(dim: usize) -> Vec<f32> {
1619 vec![0.0; dim]
1620 }
1621
1622 #[test]
1625 fn concurrency_peak_never_exceeds_permits() {
1626 let permits = 4usize;
1627 let batches = test_batches(permits * 10);
1628 let dim = crate::constants::embedding_dim();
1629 let current = Arc::new(AtomicUsize::new(0));
1630 let peak = Arc::new(AtomicUsize::new(0));
1631
1632 let current_c = Arc::clone(¤t);
1633 let peak_c = Arc::clone(&peak);
1634 let work = move |batch: Vec<(usize, String)>| {
1635 let current = Arc::clone(¤t_c);
1636 let peak = Arc::clone(&peak_c);
1637 async move {
1638 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1639 peak.fetch_max(now, Ordering::SeqCst);
1640 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1641 current.fetch_sub(1, Ordering::SeqCst);
1642 Ok(batch
1643 .into_iter()
1644 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1645 .collect())
1646 }
1647 };
1648
1649 let mut delivered = 0usize;
1650 let rt = tokio::runtime::Builder::new_multi_thread()
1651 .worker_threads(4)
1652 .enable_all()
1653 .build()
1654 .expect("test runtime");
1655 rt.block_on(run_bounded(
1656 batches,
1657 permits,
1658 dim,
1659 CancellationToken::new(),
1660 work,
1661 &mut |_idx, _v| {
1662 delivered += 1;
1663 Ok(())
1664 },
1665 ))
1666 .expect("fan-out must succeed");
1667
1668 assert_eq!(delivered, permits * 10, "every item must be delivered");
1669 assert!(
1670 peak.load(Ordering::SeqCst) <= permits,
1671 "peak concurrency {} exceeded permits {permits}",
1672 peak.load(Ordering::SeqCst)
1673 );
1674 }
1675
1676 #[test]
1679 fn panicking_task_returns_permit_and_surfaces_error() {
1680 let permits = 2usize;
1681 let batches = test_batches(4);
1682 let dim = crate::constants::embedding_dim();
1683
1684 let work = move |batch: Vec<(usize, String)>| async move {
1685 if batch[0].0 == 1 {
1686 panic!("intentional test panic");
1687 }
1688 Ok(batch
1689 .into_iter()
1690 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1691 .collect())
1692 };
1693
1694 let rt = tokio::runtime::Builder::new_multi_thread()
1695 .worker_threads(2)
1696 .enable_all()
1697 .build()
1698 .expect("test runtime");
1699 let result = rt.block_on(run_bounded(
1700 batches,
1701 permits,
1702 dim,
1703 CancellationToken::new(),
1704 work,
1705 &mut |_idx, _v| Ok(()),
1706 ));
1707
1708 let err = result.expect_err("panic must surface as an error");
1709 assert!(
1710 err.to_string().contains("panicked"),
1711 "error must mention the panic: {err}"
1712 );
1713 }
1714
1715 #[test]
1718 fn cancellation_terminates_fan_out_quickly() {
1719 let permits = 2usize;
1720 let batches = test_batches(8);
1721 let dim = crate::constants::embedding_dim();
1722 let token = CancellationToken::new();
1723
1724 let work = move |batch: Vec<(usize, String)>| async move {
1725 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1727 Ok(batch
1728 .into_iter()
1729 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1730 .collect())
1731 };
1732
1733 let rt = tokio::runtime::Builder::new_multi_thread()
1734 .worker_threads(2)
1735 .enable_all()
1736 .build()
1737 .expect("test runtime");
1738 let cancel = token.clone();
1739 let start = std::time::Instant::now();
1740 let result = rt.block_on(async move {
1741 tokio::spawn(async move {
1742 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1743 cancel.cancel();
1744 });
1745 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1746 });
1747
1748 assert!(result.is_err(), "cancelled fan-out must report an error");
1749 assert!(
1750 start.elapsed() < std::time::Duration::from_secs(10),
1751 "graceful shutdown must finish well under the work duration"
1752 );
1753 }
1754
1755 #[test]
1758 fn fan_out_rejects_divergent_dim() {
1759 let permits = 2usize;
1760 let batches = test_batches(2);
1761 let dim = crate::constants::embedding_dim();
1762
1763 let work = move |batch: Vec<(usize, String)>| async move {
1764 Ok(batch
1765 .into_iter()
1766 .map(|(i, _)| (i, vec![0.0f32; 3]))
1767 .collect::<Vec<(usize, Vec<f32>)>>())
1768 };
1769
1770 let rt = tokio::runtime::Builder::new_multi_thread()
1771 .worker_threads(2)
1772 .enable_all()
1773 .build()
1774 .expect("test runtime");
1775 let result = rt.block_on(run_bounded(
1776 batches,
1777 permits,
1778 dim,
1779 CancellationToken::new(),
1780 work,
1781 &mut |_idx, _v| Ok(()),
1782 ));
1783
1784 let err = result.expect_err("divergent dim must fail the fan-out");
1785 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1786 }
1787
1788 #[test]
1790 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1791 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1792 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1793 }
1794
1795 #[test]
1797 fn adaptive_batch_dim384_shrinks() {
1798 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1799 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1800 }
1801
1802 #[test]
1804 fn adaptive_batch_intermediate_dims() {
1805 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1806 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1807 }
1808
1809 #[test]
1811 fn adaptive_batch_small_dim_clamps_to_base() {
1812 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1813 }
1814
1815 #[test]
1817 fn adaptive_batch_total_function() {
1818 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1819 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1820 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1821 }
1822
1823 #[test]
1825 #[serial_test::serial(env)]
1826 fn adaptive_wrappers_follow_env_dim() {
1827 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1828 let chunk = chunk_embed_batch_size();
1829 let entity = entity_embed_batch_size();
1830 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1831 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1832 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1833 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1834 }
1835
1836 #[test]
1844 fn embedding_error_kind_classify_oauth_message() {
1845 assert_eq!(
1846 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1847 EmbeddingErrorKind::OAuth,
1848 );
1849 assert_eq!(
1850 EmbeddingErrorKind::classify("oauth authentication failed"),
1851 EmbeddingErrorKind::OAuth,
1852 );
1853 }
1854
1855 #[test]
1858 fn embedding_error_kind_classify_quota_message() {
1859 assert_eq!(
1860 EmbeddingErrorKind::classify("quota exhausted on backend"),
1861 EmbeddingErrorKind::Quota,
1862 );
1863 assert_eq!(
1864 EmbeddingErrorKind::classify("Usage quota limit reached"),
1865 EmbeddingErrorKind::Quota,
1866 );
1867 }
1868
1869 #[test]
1873 fn embedding_error_kind_classify_slot_exhausted_message() {
1874 assert_eq!(
1875 EmbeddingErrorKind::classify(
1876 "slot exhausted: failed to acquire LLM slot after backoff"
1877 ),
1878 EmbeddingErrorKind::SlotExhausted,
1879 );
1880 }
1881
1882 #[test]
1885 fn embedding_error_kind_classify_zero_dimension_message() {
1886 assert_eq!(
1887 EmbeddingErrorKind::classify("embedding returned dim=zero"),
1888 EmbeddingErrorKind::ZeroDimension,
1889 );
1890 assert_eq!(
1891 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
1892 EmbeddingErrorKind::ZeroDimension,
1893 );
1894 }
1895
1896 #[test]
1900 fn embedding_error_kind_classify_unknown_fallback() {
1901 assert_eq!(
1902 EmbeddingErrorKind::classify("unrelated subprocess error"),
1903 EmbeddingErrorKind::Unknown,
1904 );
1905 assert_eq!(
1906 EmbeddingErrorKind::classify("rate limit hit"),
1907 EmbeddingErrorKind::Unknown,
1908 );
1909 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
1911 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
1912 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
1913 assert_eq!(
1914 EmbeddingErrorKind::BackendMismatch.code(),
1915 "backend-mismatch"
1916 );
1917 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
1918 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
1919 }
1920
1921 #[test]
1923 fn fallback_reason_display_does_not_panic() {
1924 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
1925 let _ = FallbackReason::Cancelled.to_string();
1926 let _ = FallbackReason::Timeout {
1927 operation: "embed_query".into(),
1928 duration_secs: 30,
1929 }
1930 .to_string();
1931 }
1932
1933 #[test]
1936 fn fallback_reason_is_partial_eq() {
1937 assert_eq!(
1938 FallbackReason::EmbeddingFailed("a".into()),
1939 FallbackReason::EmbeddingFailed("a".into())
1940 );
1941 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
1942 assert_ne!(
1943 FallbackReason::EmbeddingFailed("a".into()),
1944 FallbackReason::EmbeddingFailed("b".into())
1945 );
1946 assert_ne!(
1947 FallbackReason::Cancelled,
1948 FallbackReason::Timeout {
1949 operation: "x".into(),
1950 duration_secs: 1
1951 }
1952 );
1953 }
1954
1955 #[test]
1958 fn fallback_reason_timeout_preserves_fields() {
1959 let r = FallbackReason::Timeout {
1960 operation: "embed_query_local".into(),
1961 duration_secs: 300,
1962 };
1963 match r {
1964 FallbackReason::Timeout {
1965 operation,
1966 duration_secs,
1967 } => {
1968 assert_eq!(operation, "embed_query_local");
1969 assert_eq!(duration_secs, 300);
1970 }
1971 other => panic!("expected Timeout, got {other:?}"),
1972 }
1973 }
1974
1975 #[test]
1981 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
1982 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
1983 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
1986 let result = try_embed_query_with_fallback(bogus, "hello world");
1987 match result {
1988 Err(FallbackReason::EmbeddingFailed(msg)) => {
1989 assert!(!msg.is_empty(), "fallback message must not be empty");
1991 }
1992 Err(FallbackReason::Cancelled) => {
1993 panic!("expected EmbeddingFailed, got Cancelled");
1994 }
1995 Err(FallbackReason::Timeout { .. }) => {
1996 panic!("expected EmbeddingFailed, got Timeout");
1997 }
1998 Err(FallbackReason::SlotExhausted) => {
1999 panic!("expected EmbeddingFailed, got SlotExhausted");
2000 }
2001 Err(FallbackReason::OAuthQuota { .. }) => {
2002 panic!("expected EmbeddingFailed, got OAuthQuota");
2003 }
2004 Err(FallbackReason::BackendMismatch { .. }) => {
2005 panic!("expected EmbeddingFailed, got BackendMismatch");
2006 }
2007 Err(FallbackReason::DimZero) => {
2008 panic!("expected EmbeddingFailed, got DimZero");
2009 }
2010 Ok(_) => {
2011 panic!("expected an error, got Ok — embedder must fail for bogus path");
2012 }
2013 }
2014 }
2015
2016 #[test]
2018 fn g56_entity_cache_key_is_stable_and_distinct() {
2019 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2020 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2021 let k3 = entity_cache_key("codex:default", "claude-code");
2022 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2023 assert_eq!(k1, k2, "same model+text must hash identically");
2024 assert_ne!(k1, k3, "different text must hash differently");
2025 assert_ne!(k1, k4, "different model must hash differently");
2026 }
2027
2028 #[test]
2029 fn g56_entity_embed_cache_stats_hit_rate() {
2030 let zero = EmbedCacheStats::default();
2031 assert_eq!(zero.hit_rate(), 0.0);
2032 let half = EmbedCacheStats {
2033 requested: 4,
2034 hits: 2,
2035 misses: 2,
2036 };
2037 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2038 let all = EmbedCacheStats {
2039 requested: 7,
2040 hits: 7,
2041 misses: 0,
2042 };
2043 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2044 }
2045
2046 #[test]
2047 fn g56_entity_embed_cache_populates_and_hits() {
2048 let cache = entity_embed_cache();
2052 let model = "test-model";
2053 let text = "sqlite-graphrag";
2054 let key = entity_cache_key(model, text);
2055 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2056 cache.lock().insert(key, Arc::clone(&stored));
2057 let guard = cache.lock();
2058 let hit = guard.get(&key).expect("cache must return stored value");
2059 assert_eq!(hit.len(), crate::constants::embedding_dim());
2060 assert!((hit[0] - 0.42).abs() < 1e-6);
2061 }
2062
2063 #[test]
2064 fn g56_empty_texts_short_circuits_with_zero_stats() {
2065 let stats = EmbedCacheStats::default();
2068 assert_eq!(stats.requested, 0);
2069 assert_eq!(stats.hits, 0);
2070 assert_eq!(stats.misses, 0);
2071 assert_eq!(stats.hit_rate(), 0.0);
2072 }
2073}
2074
2075#[cfg(test)]
2079mod embed_with_fallback_tests {
2080 use super::*;
2081 use crate::llm::exit_code_hints::LlmBackendError;
2082
2083 #[test]
2084 fn none_backend_returns_empty_vector_without_calling_llm() {
2085 let (v, kind) = embed_via_backend(
2089 std::path::Path::new("/nonexistent"),
2090 "any text",
2091 &LlmBackendKind::None,
2092 )
2093 .expect("None backend never fails");
2094 assert!(v.is_empty());
2095 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2096 }
2097
2098 #[test]
2099 fn empty_chain_defaults_to_codex_claude_none() {
2100 let defaults = [
2104 LlmBackendKind::Codex,
2105 LlmBackendKind::Claude,
2106 LlmBackendKind::None,
2107 ];
2108
2109 #[allow(dead_code)]
2114 fn llm_backend_kind_as_str_is_stable() {
2115 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2116 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2117 assert_eq!(LlmBackendKind::None.as_str(), "none");
2118 }
2119
2120 #[allow(dead_code)]
2121 fn fallback_reason_reason_code_is_stable() {
2122 assert_eq!(
2123 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2124 "embedding_failed"
2125 );
2126 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2127 assert_eq!(
2128 FallbackReason::Timeout {
2129 operation: "embed_query".into(),
2130 duration_secs: 30
2131 }
2132 .reason_code(),
2133 "timeout"
2134 );
2135 }
2136 assert_eq!(defaults.len(), 3);
2137 }
2138
2139 #[test]
2140 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2141 let chain = vec![LlmBackendKind::None];
2153 let err = embed_with_fallback(
2154 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2155 "hello",
2156 &chain,
2157 false,
2158 )
2159 .expect_err("chain of only [None] without skip_on_failure MUST abort");
2160 let msg = format!("{err}");
2161 assert!(
2162 msg.contains("no LLM backends available"),
2163 "error must mention exhausted chain, got: {msg}"
2164 );
2165 }
2166 #[test]
2167 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2168 let chain = vec![LlmBackendKind::None];
2173 let v = embed_with_fallback(
2174 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2175 "hello",
2176 &chain,
2177 true,
2178 )
2179 .expect("None chain is always Ok");
2180 assert!(v.0.is_empty(), "vector must be empty");
2181 assert_eq!(v.1, LlmBackendKind::None);
2182 }
2183 #[allow(dead_code)]
2184 fn llm_backend_error_no_backends_default_message() {
2185 let e = LlmBackendError::NoBackendsAvailable;
2188 let h = e.hint();
2189 assert!(h.contains("--llm-fallback"));
2190 }
2191
2192 #[test]
2193 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2194 let e = LlmBackendError::NonZeroExit {
2195 exit_code: Some(137),
2196 signal: Some(9),
2197 stdout_tail: "out".into(),
2198 stderr_tail: "OOM killed".into(),
2199 binary: "codex".into(),
2200 hint: "OOM".into(),
2201 };
2202 let s = e.to_string();
2203 assert!(s.contains("codex"));
2204 assert!(s.contains("OOM killed"));
2205 assert!(s.contains("signal 9") || s.contains("exit 137"));
2206 }
2207}