1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62static OPENROUTER_CHAT_CLIENT: OnceLock<crate::chat_api::OpenRouterChatClient> = OnceLock::new();
66
67pub fn is_openrouter_initialized() -> bool {
69 OPENROUTER_CLIENT.get().is_some()
70}
71static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
72
73static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
79
80pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
84
85pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
89
90pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
92
93fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
101 let base = base.max(1);
102 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
103}
104
105pub fn chunk_embed_batch_size() -> usize {
107 let dim = crate::constants::embedding_dim();
108 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
109 tracing::debug!(
110 dim,
111 base = CHUNK_EMBED_BATCH_SIZE,
112 batch,
113 "adaptive chunk batch size (G44)"
114 );
115 batch
116}
117
118pub fn entity_embed_batch_size() -> usize {
120 let dim = crate::constants::embedding_dim();
121 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
122 tracing::debug!(
123 dim,
124 base = ENTITY_EMBED_BATCH_SIZE,
125 batch,
126 "adaptive entity batch size (G44)"
127 );
128 batch
129}
130
131pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
133 if let Some(rt) = RUNTIME.get() {
134 return Ok(rt);
135 }
136 let rt = tokio::runtime::Builder::new_multi_thread()
137 .worker_threads(2)
138 .enable_all()
139 .build()
140 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
141 let _ = RUNTIME.set(rt);
142 Ok(RUNTIME.get().expect("RUNTIME initialised above"))
143}
144
145pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
147 if let Some(e) = EMBEDDER.get() {
148 return Ok(e);
149 }
150 let backend = LlmEmbedding::detect_available()?;
151 let _ = EMBEDDER.set(Mutex::new(backend));
152 Ok(EMBEDDER.get().expect("EMBEDDER initialised above"))
153}
154
155pub fn get_claude_embedder(
160 claude_binary: Option<&Path>,
161 claude_model: Option<&str>,
162) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
163 if let Some(e) = CLAUDE_EMBEDDER.get() {
164 return Ok(e);
165 }
166 let mut builder = LlmEmbedding::with_claude_builder();
167 if let Some(b) = claude_binary {
168 builder = builder.override_binary(b.to_path_buf());
169 }
170 if let Some(m) = claude_model {
171 builder = builder.override_model(m.to_string());
172 }
173 let backend = builder.build()?;
174 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
175 Ok(CLAUDE_EMBEDDER
176 .get()
177 .expect("CLAUDE_EMBEDDER initialised above"))
178}
179
180pub fn get_opencode_embedder(
185 opencode_binary: Option<&Path>,
186 opencode_model: Option<&str>,
187) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
188 if let Some(e) = OPENCODE_EMBEDDER.get() {
189 return Ok(e);
190 }
191 let mut builder = LlmEmbedding::with_opencode_builder();
192 if let Some(b) = opencode_binary {
193 builder = builder.override_binary(b.to_path_buf());
194 }
195 if let Some(m) = opencode_model {
196 builder = builder.override_model(m.to_string());
197 }
198 let backend = builder.build()?;
199 let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
200 Ok(OPENCODE_EMBEDDER
201 .get()
202 .expect("OPENCODE_EMBEDDER initialised above"))
203}
204
205pub fn get_openrouter_embedder(
206 api_key: secrecy::SecretBox<String>,
207 model: &str,
208 dim: usize,
209) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
210 if let Some(c) = OPENROUTER_CLIENT.get() {
211 return Ok(c);
212 }
213 let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
214 let _ = OPENROUTER_CLIENT.set(client);
215 Ok(OPENROUTER_CLIENT
216 .get()
217 .expect("OPENROUTER_CLIENT initialised above"))
218}
219
220pub fn get_openrouter_chat_client(
224 api_key: secrecy::SecretBox<String>,
225 model: &str,
226 timeout_secs: u64,
227) -> Result<&'static crate::chat_api::OpenRouterChatClient, AppError> {
228 if let Some(c) = OPENROUTER_CHAT_CLIENT.get() {
229 return Ok(c);
230 }
231 let client =
232 crate::chat_api::OpenRouterChatClient::new(api_key, model.to_string(), timeout_secs)?;
233 let _ = OPENROUTER_CHAT_CLIENT.set(client);
234 Ok(OPENROUTER_CHAT_CLIENT
235 .get()
236 .expect("OPENROUTER_CHAT_CLIENT initialised above"))
237}
238
239pub fn openrouter_chat_client() -> Option<&'static crate::chat_api::OpenRouterChatClient> {
244 OPENROUTER_CHAT_CLIENT.get()
245}
246
247pub fn embed_via_claude_local(
251 _models_dir: &Path,
252 text: &str,
253 claude_binary: Option<&Path>,
254 claude_model: Option<&str>,
255) -> Result<Vec<f32>, AppError> {
256 let _slot_guard = acquire_llm_slot_for_embedding()?;
257 let embedder = get_claude_embedder(claude_binary, claude_model)?;
258 embed_passage(embedder, text)
259}
260
261pub fn embed_via_claude_local_resolved(
266 _models_dir: &Path,
267 text: &str,
268 claude_binary: Option<&Path>,
269 claude_model: Option<&str>,
270) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
271 let _slot_guard = acquire_llm_slot_for_embedding()?;
272 let embedder = get_claude_embedder(claude_binary, claude_model)?;
273 let v = embed_passage(embedder, text)?;
274 Ok((v, LlmBackendKind::Claude))
275}
276
277pub fn embed_via_opencode_local_resolved(
282 _models_dir: &Path,
283 text: &str,
284 opencode_binary: Option<&Path>,
285 opencode_model: Option<&str>,
286) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
287 let _slot_guard = acquire_llm_slot_for_embedding()?;
288 let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
289 let v = embed_passage(embedder, text)?;
290 Ok((v, LlmBackendKind::Opencode))
291}
292fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
295 embedder.lock().clone()
296}
297
298pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
302 let client = clone_client(embedder);
303 let result = client.embed_passage(text)?;
304 validate_dim(result)
305}
306
307pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
311 let client = clone_client(embedder);
312 let result = client.embed_query(text)?;
313 validate_dim(result)
314}
315
316pub fn embed_passages_controlled(
321 embedder: &Mutex<LlmEmbedding>,
322 texts: &[&str],
323 _token_counts: &[usize],
324) -> Result<Vec<Vec<f32>>, AppError> {
325 if texts.is_empty() {
326 return Ok(Vec::new());
327 }
328 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
329 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
330}
331
332pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
333 let _slot_guard = acquire_llm_slot_for_embedding()?;
334 let embedder = get_embedder(models_dir)?;
335 embed_passage(embedder, text)
336}
337
338pub fn should_skip_embedding_on_failure() -> bool {
342 matches!(
343 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
344 Ok("1") | Ok("true")
345 )
346}
347
348pub fn embed_passage_or_skip(
355 models_dir: &Path,
356 text: &str,
357 choice: Option<crate::cli::LlmBackendChoice>,
358) -> Result<Option<Vec<f32>>, AppError> {
359 match embed_passage_with_choice(models_dir, text, choice) {
360 Ok((v, _backend)) => Ok(Some(v)),
361 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
362 Err(e) => {
363 if should_skip_embedding_on_failure() {
364 tracing::warn!(
365 error = %e,
366 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
367 );
368 Ok(None)
369 } else {
370 Err(e)
371 }
372 }
373 }
374}
375
376pub fn embed_passage_local_resolved(
382 models_dir: &Path,
383 text: &str,
384) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
385 let _slot_guard = acquire_llm_slot_for_embedding()?;
386 let embedder = get_embedder(models_dir)?;
387 let v = embed_passage(embedder, text)?;
388 let kind = match embedder.lock().flavour() {
389 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
390 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
391 crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
392 };
393 Ok((v, kind))
394}
395
396pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
397 let _slot_guard = acquire_llm_slot_for_embedding()?;
398 let embedder = get_embedder(models_dir)?;
399 embed_query(embedder, text)
400}
401
402pub fn embed_passage_with_choice(
419 models_dir: &Path,
420 text: &str,
421 choice: Option<crate::cli::LlmBackendChoice>,
422) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
423 let _slot_guard = acquire_llm_slot_for_embedding()?;
424 match choice {
425 None => {
426 let embedder = get_embedder(models_dir)?;
427 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
428 }
429 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
430 }
431}
432
433pub fn embed_passage_with_embedding_choice(
437 models_dir: &Path,
438 text: &str,
439 embedding_backend: crate::cli::EmbeddingBackendChoice,
440 llm_backend: crate::cli::LlmBackendChoice,
441) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
442 let _slot_guard = acquire_llm_slot_for_embedding()?;
443 let chain = embedding_backend.to_chain(llm_backend);
444 embed_with_fallback(models_dir, text, &chain, false)
445}
446
447pub fn try_embed_query_with_choice(
453 models_dir: &Path,
454 text: &str,
455 choice: Option<crate::cli::LlmBackendChoice>,
456) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
457 match embed_passage_with_choice(models_dir, text, choice) {
458 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
471 Ok((v, backend)) => Ok((v, backend)),
472 Err(e) => Err(classify_embedding_error(e)),
473 }
474}
475pub fn try_embed_query_with_embedding_choice(
480 models_dir: &Path,
481 text: &str,
482 embedding_backend: crate::cli::EmbeddingBackendChoice,
483 llm_backend: crate::cli::LlmBackendChoice,
484) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
485 match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
486 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
487 Ok((v, backend)) => Ok((v, backend)),
488 Err(e) => Err(classify_embedding_error(e)),
489 }
490}
491
492fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
504 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
505 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
506 .ok()
507 .and_then(|s| s.parse::<u32>().ok())
508 .filter(|n| *n >= 1)
509 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
510 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
511 0
512 } else {
513 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
514 .ok()
515 .and_then(|s| s.parse::<u64>().ok())
516 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
517 };
518 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
526 Ok(guard) => Ok(guard),
527 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
528 "slot exhausted: {e} (fall back to FTS5)"
529 ))),
530 Err(e) => Err(e),
531 }
532}
533#[derive(Debug, Clone, Copy, PartialEq, Eq)]
545pub enum EmbeddingErrorKind {
546 OAuth,
548 Quota,
550 SlotExhausted,
552 BackendMismatch,
554 ZeroDimension,
556 Unknown,
558}
559
560impl EmbeddingErrorKind {
561 pub fn classify(msg: &str) -> Self {
570 let m = msg.to_lowercase();
571 if m.contains("oauth") {
572 Self::OAuth
573 } else if m.contains("quota") {
574 Self::Quota
575 } else if m.contains("slot exhausted") {
576 Self::SlotExhausted
577 } else if m.contains("backend mismatch") {
578 Self::BackendMismatch
579 } else if m.contains("dim") && m.contains("zero") {
580 Self::ZeroDimension
581 } else {
582 Self::Unknown
583 }
584 }
585
586 pub fn code(&self) -> &'static str {
588 match self {
589 Self::OAuth => "oauth",
590 Self::Quota => "quota",
591 Self::SlotExhausted => "slot-exhausted",
592 Self::BackendMismatch => "backend-mismatch",
593 Self::ZeroDimension => "zero-dimension",
594 Self::Unknown => "unknown",
595 }
596 }
597}
598
599impl std::fmt::Display for EmbeddingErrorKind {
600 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
601 f.write_str(self.code())
602 }
603}
604
605#[derive(Debug, Clone, PartialEq)]
612pub enum FallbackReason {
613 EmbeddingFailed(String),
617 SlotExhausted,
622 OAuthQuota { backend: &'static str },
626 BackendMismatch {
630 requested: &'static str,
631 resolved: &'static str,
632 },
633 DimZero,
638 Cancelled,
640 Timeout {
643 operation: String,
644 duration_secs: u64,
645 },
646}
647
648impl FallbackReason {
649 pub fn reason_code(&self) -> &'static str {
653 match self {
654 Self::EmbeddingFailed(_) => "embedding_failed",
655 Self::SlotExhausted => "slot_exhausted",
656 Self::OAuthQuota { .. } => "oauth_quota",
657 Self::BackendMismatch { .. } => "backend_mismatch",
658 Self::DimZero => "dim_zero",
659 Self::Cancelled => "cancelled",
660 Self::Timeout { .. } => "timeout",
661 }
662 }
663}
664
665impl std::fmt::Display for FallbackReason {
666 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667 match self {
668 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
669 Self::SlotExhausted => write!(
670 f,
671 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
672 ),
673 Self::OAuthQuota { backend } => {
674 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
675 }
676 Self::BackendMismatch {
677 requested,
678 resolved,
679 } => {
680 write!(
681 f,
682 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
683 )
684 }
685 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
686 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
687 Self::Timeout {
688 operation,
689 duration_secs,
690 } => {
691 write!(
692 f,
693 "embedding timed out after {duration_secs}s during {operation}"
694 )
695 }
696 }
697 }
698}
699
700impl std::error::Error for FallbackReason {}
701
702pub fn try_embed_query_with_fallback(
710 models_dir: &Path,
711 query: &str,
712) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
713 match embed_query_local(models_dir, query) {
714 Ok(v) => Ok((v, LlmBackendKind::None)),
715 Err(e) => Err(classify_embedding_error(e)),
716 }
717}
718
719pub fn try_embed_query_with_deterministic_fallback(
728 models_dir: &Path,
729 query: &str,
730 choice: Option<crate::cli::LlmBackendChoice>,
731) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
732 match try_embed_query_with_choice(models_dir, query, choice) {
733 Ok(t) => Ok(t),
734 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
735 let alt = match backend {
736 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
737 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
738 "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
739 "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
740 _ => None,
741 };
742 if let Some(alt_choice) = alt {
743 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
744 } else {
745 Err(reason)
746 }
747 }
748 Err(reason @ FallbackReason::SlotExhausted) => {
749 std::thread::sleep(std::time::Duration::from_millis(750));
750 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
751 }
752 Err(other) => Err(other),
753 }
754}
755
756pub fn classify_embedding_error(err: AppError) -> FallbackReason {
764 match err {
765 AppError::Timeout {
766 operation,
767 duration_secs,
768 } => FallbackReason::Timeout {
769 operation,
770 duration_secs,
771 },
772 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
773 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
782 EmbeddingErrorKind::OAuth => {
783 let backend = if msg.contains("codex") {
784 "codex"
785 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
786 "claude"
791 } else if msg.contains("opencode") {
792 "opencode"
793 } else {
794 "unknown"
795 };
796 FallbackReason::OAuthQuota { backend }
797 }
798 EmbeddingErrorKind::Quota => {
799 let backend = if msg.contains("codex") {
800 "codex"
801 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
802 "claude"
803 } else if msg.contains("opencode") {
804 "opencode"
805 } else {
806 "unknown"
807 };
808 FallbackReason::OAuthQuota { backend }
809 }
810 EmbeddingErrorKind::BackendMismatch => {
811 let (requested, resolved) =
816 if msg.contains("requested claude") && msg.contains("but codex") {
817 ("claude", "codex")
818 } else if msg.contains("requested codex") && msg.contains("but claude") {
819 ("codex", "claude")
820 } else if msg.contains("requested claude") {
821 ("claude", "unknown")
822 } else if msg.contains("requested codex") {
823 ("codex", "unknown")
824 } else {
825 ("unknown", "unknown")
826 };
827 FallbackReason::BackendMismatch {
828 requested,
829 resolved,
830 }
831 }
832 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
833 EmbeddingErrorKind::Unknown => {
834 if msg.contains("cancelled") {
835 FallbackReason::Cancelled
836 } else {
837 FallbackReason::EmbeddingFailed(msg)
838 }
839 }
840 },
841 e => FallbackReason::EmbeddingFailed(e.to_string()),
842 }
843}
844pub fn embed_with_fallback(
863 models_dir: &Path,
864 text: &str,
865 chain: &[LlmBackendKind],
866 skip_on_failure: bool,
867) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
868 use crate::llm::exit_code_hints::LlmBackendError;
869 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
870 vec![
871 LlmBackendKind::Codex,
872 LlmBackendKind::Claude,
873 LlmBackendKind::Opencode,
874 LlmBackendKind::None,
875 ]
876 } else {
877 chain.to_vec()
878 };
879
880 let mut last_err: Option<AppError> = None;
881 for backend in &effective {
882 match embed_via_backend_strict(
893 models_dir,
894 text,
895 backend,
896 last_err.as_ref(),
897 skip_on_failure,
898 ) {
899 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
900 Err(e) => {
901 if matches!(e, AppError::Validation(_)) {
906 return Err(e);
907 }
908 tracing::warn!(
909 target: "embedding",
910 backend = ?backend,
911 error = %e,
912 "embed_with_fallback: backend failed, trying next"
913 );
914 last_err = Some(e);
915 }
916 }
917 }
918 if skip_on_failure {
919 return Ok((Vec::new(), LlmBackendKind::None));
924 }
925 Err(last_err
926 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
927}
928
929#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
933pub enum LlmBackendKind {
934 Codex,
936 Claude,
938 Opencode,
940 OpenRouter,
942 None,
944}
945
946impl LlmBackendKind {
947 pub fn as_str(self) -> &'static str {
950 match self {
951 Self::Codex => "codex",
952 Self::Claude => "claude",
953 Self::Opencode => "opencode",
954 Self::OpenRouter => "openrouter",
955 Self::None => "none",
956 }
957 }
958}
959
960pub fn embed_via_backend(
975 models_dir: &Path,
976 text: &str,
977 backend: &LlmBackendKind,
978) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
979 match backend {
980 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
981 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
982 LlmBackendKind::Claude => {
983 tracing::debug!(
987 target: "embedder",
988 backend = "claude",
989 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
990 );
991 embed_via_claude_local_resolved(models_dir, text, None, None)
992 }
993 LlmBackendKind::Opencode => {
994 tracing::debug!(
995 target: "embedder",
996 backend = "opencode",
997 "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
998 );
999 embed_via_opencode_local_resolved(models_dir, text, None, None)
1000 }
1001 LlmBackendKind::OpenRouter => {
1002 tracing::debug!(
1003 target: "embedder",
1004 backend = "openrouter",
1005 "embed_via_backend: using OpenRouter API (v1.0.93)"
1006 );
1007 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1008 AppError::Embedding(
1009 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1010 )
1011 })?;
1012 let rt = shared_runtime()?;
1013 let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
1014 Ok((vec, LlmBackendKind::OpenRouter))
1015 }
1016 }
1017}
1018
1019pub fn embed_via_backend_strict(
1032 models_dir: &Path,
1033 text: &str,
1034 backend: &LlmBackendKind,
1035 last_err: Option<&AppError>,
1036 skip_on_failure: bool,
1037) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1038 use crate::llm::exit_code_hints::LlmBackendError;
1039 match backend {
1040 LlmBackendKind::None => {
1041 if skip_on_failure && last_err.is_none() {
1045 Ok((Vec::new(), LlmBackendKind::None))
1046 } else if last_err.is_some() {
1047 Err(match last_err {
1051 Some(e) => AppError::Embedding(format!("{e}")),
1052 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1053 })
1054 } else {
1055 Err(AppError::Embedding(
1058 LlmBackendError::NoBackendsAvailable.to_string(),
1059 ))
1060 }
1061 }
1062 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1063 LlmBackendKind::Claude => {
1064 tracing::debug!(
1065 target: "embedder",
1066 backend = "claude",
1067 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1068 );
1069 embed_via_claude_local_resolved(models_dir, text, None, None)
1070 }
1071 LlmBackendKind::Opencode => {
1072 tracing::debug!(
1073 target: "embedder",
1074 backend = "opencode",
1075 "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1076 );
1077 embed_via_opencode_local_resolved(models_dir, text, None, None)
1078 }
1079 LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1080 }
1081}
1082
1083pub fn embed_via_backend_legacy(
1088 models_dir: &Path,
1089 text: &str,
1090 backend: &LlmBackendKind,
1091) -> Result<Vec<f32>, AppError> {
1092 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1093}
1094
1095pub fn embed_passages_controlled_local(
1096 models_dir: &Path,
1097 texts: &[&str],
1098 token_counts: &[usize],
1099) -> Result<Vec<Vec<f32>>, AppError> {
1100 let embedder = get_embedder(models_dir)?;
1101 embed_passages_controlled(embedder, texts, token_counts)
1102}
1103
1104pub fn embed_passages_parallel_local(
1107 models_dir: &Path,
1108 texts: &[String],
1109 parallelism: usize,
1110 batch_size: usize,
1111) -> Result<Vec<Vec<f32>>, AppError> {
1112 let embedder = get_embedder(models_dir)?;
1113 embed_texts_parallel(embedder, texts, parallelism, batch_size)
1114}
1115
1116type EmbedChunkResult = (usize, Result<Vec<Vec<f32>>, AppError>);
1120
1121fn reassemble_ordered(mut parts: Vec<(usize, Vec<Vec<f32>>)>) -> Vec<Vec<f32>> {
1126 parts.sort_by_key(|(idx, _)| *idx);
1127 parts.into_iter().flat_map(|(_, v)| v).collect()
1128}
1129
1130pub fn embed_passages_parallel_with_embedding_choice(
1137 models_dir: &Path,
1138 texts: &[String],
1139 parallelism: usize,
1140 batch_size: usize,
1141 embedding_backend: crate::cli::EmbeddingBackendChoice,
1142 llm_backend: crate::cli::LlmBackendChoice,
1143) -> Result<Vec<Vec<f32>>, AppError> {
1144 let chain = embedding_backend.to_chain(llm_backend);
1145 if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1146 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1147 AppError::Embedding(
1148 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1149 )
1150 })?;
1151 let rt = shared_runtime()?;
1152
1153 let k = parallelism.clamp(1, 16);
1158 if texts.len() <= 32 || k == 1 {
1159 let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1160 let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1161 return Ok(vecs);
1162 }
1163
1164 let vecs = rt.block_on(async move {
1169 let mut set: JoinSet<EmbedChunkResult> = JoinSet::new();
1170 let mut parts: Vec<(usize, Vec<Vec<f32>>)> = Vec::new();
1171
1172 for (idx, chunk) in texts.chunks(32).enumerate() {
1173 if set.len() >= k {
1174 if let Some(joined) = set.join_next().await {
1175 let (cidx, res) = joined.map_err(|e| {
1176 AppError::Embedding(format!("embedding task join error: {e}"))
1177 })?;
1178 parts.push((cidx, res?));
1179 }
1180 }
1181 let owned: Vec<String> = chunk.to_vec();
1182 set.spawn(async move {
1183 let refs: Vec<&str> = owned.iter().map(|s| s.as_str()).collect();
1184 let r = client.embed_batch(&refs, client.default_input_type()).await;
1185 (idx, r)
1186 });
1187 }
1188
1189 while let Some(joined) = set.join_next().await {
1190 let (cidx, res) = joined
1191 .map_err(|e| AppError::Embedding(format!("embedding task join error: {e}")))?;
1192 parts.push((cidx, res?));
1193 }
1194
1195 Ok::<Vec<Vec<f32>>, AppError>(reassemble_ordered(parts))
1196 })?;
1197 Ok(vecs)
1198 } else {
1199 embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1200 }
1201}
1202
1203type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1215
1216static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1217
1218fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1219 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1220}
1221
1222fn entity_cache_key(model: &str, text: &str) -> u64 {
1223 let mut hasher = blake3::Hasher::new();
1224 hasher.update(model.as_bytes());
1225 hasher.update(b"\0");
1226 hasher.update(text.as_bytes());
1227 let h = hasher.finalize();
1228 let bytes = h.as_bytes();
1229 u64::from_le_bytes([
1230 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1231 ])
1232}
1233
1234pub fn embed_entity_texts_cached(
1244 models_dir: &Path,
1245 texts: &[String],
1246 parallelism: usize,
1247 embedding_backend: crate::cli::EmbeddingBackendChoice,
1248 llm_backend: crate::cli::LlmBackendChoice,
1249) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1250 if texts.is_empty() {
1251 return Ok((Vec::new(), EmbedCacheStats::default()));
1252 }
1253 let chain = embedding_backend.to_chain(llm_backend);
1257
1258 if chain.as_slice() == [LlmBackendKind::None] {
1264 let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1265 return Ok((
1266 out,
1267 EmbedCacheStats {
1268 requested: texts.len(),
1269 hits: 0,
1270 misses: texts.len(),
1271 },
1272 ));
1273 }
1274
1275 let routed_openrouter =
1281 chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1282 let model = if routed_openrouter {
1283 format!("openrouter:{}", crate::constants::embedding_dim())
1284 } else {
1285 get_embedder(models_dir)?.lock().model_label()
1286 };
1287 let cache = entity_embed_cache();
1288 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1289 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1290 {
1291 let guard = cache.lock();
1292 for (i, text) in texts.iter().enumerate() {
1293 let key = entity_cache_key(&model, text);
1294 if let Some(v) = guard.get(&key) {
1295 hits[i] = Some(Arc::clone(v));
1296 } else {
1297 miss_indices.push(i);
1298 }
1299 }
1300 }
1301 let miss_count = miss_indices.len();
1302 if miss_count > 0 {
1303 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1304 let miss_vecs = embed_passages_parallel_with_embedding_choice(
1308 models_dir,
1309 &miss_texts,
1310 parallelism,
1311 entity_embed_batch_size(),
1312 embedding_backend,
1313 llm_backend,
1314 )?;
1315 let mut guard = cache.lock();
1316 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1317 let vec = Arc::new(miss_vecs[slot].clone());
1318 let key = entity_cache_key(&model, &texts[orig_idx]);
1319 guard.insert(key, Arc::clone(&vec));
1320 hits[orig_idx] = Some(vec);
1321 }
1322 }
1323 let mut out = Vec::with_capacity(texts.len());
1324 for hit in hits.into_iter() {
1325 let v = hit.ok_or_else(|| {
1326 AppError::Embedding("entity embed cache produced null result".to_string())
1327 })?;
1328 out.push((*v).clone());
1329 }
1330 Ok((
1331 out,
1332 EmbedCacheStats {
1333 requested: texts.len(),
1334 hits: texts.len() - miss_count,
1335 misses: miss_count,
1336 },
1337 ))
1338}
1339
1340#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1342pub struct EmbedCacheStats {
1343 pub requested: usize,
1344 pub hits: usize,
1345 pub misses: usize,
1346}
1347
1348impl EmbedCacheStats {
1349 pub fn hit_rate(&self) -> f64 {
1351 if self.requested == 0 {
1352 0.0
1353 } else {
1354 self.hits as f64 / self.requested as f64
1355 }
1356 }
1357}
1358
1359pub fn embed_texts_parallel(
1372 embedder: &Mutex<LlmEmbedding>,
1373 texts: &[String],
1374 parallelism: usize,
1375 batch_size: usize,
1376) -> Result<Vec<Vec<f32>>, AppError> {
1377 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1378 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1379 slots[idx] = Some(v.to_vec());
1380 Ok(())
1381 })?;
1382 let mut out = Vec::with_capacity(slots.len());
1383 for (idx, slot) in slots.into_iter().enumerate() {
1384 out.push(slot.ok_or_else(|| {
1385 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1386 })?);
1387 }
1388 Ok(out)
1389}
1390
1391pub fn embed_texts_parallel_with(
1395 embedder: &Mutex<LlmEmbedding>,
1396 texts: &[String],
1397 parallelism: usize,
1398 batch_size: usize,
1399 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1400) -> Result<(), AppError> {
1401 if texts.is_empty() {
1402 return Ok(());
1403 }
1404 let dim = crate::constants::embedding_dim();
1405 if texts.len() == 1 {
1406 let v = embed_passage(embedder, &texts[0])?;
1407 return on_result(0, &v);
1408 }
1409
1410 let client = clone_client(embedder);
1411 let permits = effective_permits(parallelism);
1412 let batches = build_batches(texts, batch_size.max(1));
1413 let token = crate::cancel_token().clone();
1414
1415 let work = move |batch: Vec<(usize, String)>| {
1416 let client = client.clone();
1417 async move {
1418 client
1419 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1420 .await
1421 }
1422 };
1423
1424 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1425 match tokio::runtime::Handle::try_current() {
1426 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1427 Err(_) => shared_runtime()?.block_on(fan_out),
1428 }
1429}
1430
1431fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1433 texts
1434 .iter()
1435 .cloned()
1436 .enumerate()
1437 .collect::<Vec<_>>()
1438 .chunks(batch_size)
1439 .map(|c| c.to_vec())
1440 .collect()
1441}
1442
1443pub fn effective_permits(requested: usize) -> usize {
1448 let cpus = std::thread::available_parallelism()
1449 .map(|n| n.get())
1450 .unwrap_or(4);
1451 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1452 / crate::constants::LLM_WORKER_RSS_MB)
1453 .max(1) as usize;
1454 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1455}
1456
1457async fn run_bounded<F, Fut>(
1467 batches: Vec<Vec<(usize, String)>>,
1468 permits: usize,
1469 dim: usize,
1470 token: CancellationToken,
1471 work: F,
1472 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1473) -> Result<(), AppError>
1474where
1475 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1476 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1477{
1478 let total_batches = batches.len();
1479 let semaphore = Arc::new(Semaphore::new(permits));
1480 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1483 let mut set: JoinSet<()> = JoinSet::new();
1484
1485 for (batch_idx, batch) in batches.into_iter().enumerate() {
1486 let sem = Arc::clone(&semaphore);
1487 let token = token.clone();
1488 let tx = tx.clone();
1489 let work = work.clone();
1490 set.spawn(async move {
1491 let wait_start = std::time::Instant::now();
1492 let Ok(_permit) = sem.acquire_owned().await else {
1495 let _ = tx
1496 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1497 .await;
1498 return;
1499 };
1500 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1501 let work_start = std::time::Instant::now();
1502 let outcome = if crate::should_obey_shutdown() {
1508 tokio::select! {
1509 res = work(batch) => res,
1510 _ = token.cancelled() => Err(AppError::Embedding(
1511 "embedding cancelled by shutdown signal".to_string(),
1512 )),
1513 }
1514 } else {
1515 work(batch).await
1516 };
1517 tracing::debug!(
1519 target: "embedding",
1520 batch_idx,
1521 permit_wait_ms,
1522 work_ms = work_start.elapsed().as_millis() as u64,
1523 ok = outcome.is_ok(),
1524 "embedding batch finished"
1525 );
1526 let _ = tx.send(outcome).await;
1527 });
1528 }
1529 drop(tx);
1530
1531 let mut completed = 0usize;
1532 let mut failed = 0usize;
1533 let mut cancelled = 0usize;
1534 let mut first_error: Option<AppError> = None;
1535
1536 while let Some(message) = rx.recv().await {
1537 match message {
1538 Ok(items) => {
1539 completed += 1;
1540 if first_error.is_none() {
1541 for (idx, v) in items {
1542 if v.len() != dim {
1543 first_error = Some(AppError::Embedding(format!(
1544 "LLM returned {} dims for item {idx}, expected {dim}; \
1545 refusing to truncate or pad silently (G42/C5)",
1546 v.len()
1547 )));
1548 break;
1549 }
1550 if let Err(e) = on_result(idx, &v) {
1551 first_error = Some(e);
1552 break;
1553 }
1554 }
1555 if first_error.is_some() {
1556 set.shutdown().await;
1559 }
1560 }
1561 }
1562 Err(e) => {
1563 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1564 cancelled += 1;
1565 } else {
1566 failed += 1;
1567 }
1568 if first_error.is_none() {
1569 first_error = Some(e);
1570 set.shutdown().await;
1571 }
1572 }
1573 }
1574 }
1575
1576 while let Some(join_result) = set.join_next().await {
1579 if let Err(join_err) = join_result {
1580 if join_err.is_panic() {
1581 failed += 1;
1582 if first_error.is_none() {
1583 first_error = Some(AppError::Embedding(format!(
1584 "embedding task panicked: {join_err}"
1585 )));
1586 }
1587 } else {
1588 cancelled += 1;
1589 }
1590 }
1591 }
1592
1593 tracing::debug!(
1603 target: "embedding",
1604 total_batches,
1605 completed,
1606 failed,
1607 cancelled,
1608 "embedding fan-out finished"
1609 );
1610
1611 match first_error {
1612 Some(e) => Err(e),
1613 None => Ok(()),
1614 }
1615}
1616
1617pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1618 let mut out = Vec::with_capacity(v.len() * 4);
1619 for f in v {
1620 out.extend_from_slice(&f.to_le_bytes());
1621 }
1622 out
1623}
1624
1625pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1626 let mut out = Vec::with_capacity(bytes.len() / 4);
1627 for chunk in bytes.chunks_exact(4) {
1628 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1629 }
1630 out
1631}
1632
1633pub fn embedding_dim() -> usize {
1636 crate::constants::embedding_dim()
1637}
1638
1639fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1643 let dim = crate::constants::embedding_dim();
1644 if v.len() != dim {
1645 return Err(AppError::Embedding(format!(
1646 "embedding has {} dims, expected {dim}; \
1647 refusing to truncate or pad silently (G42/C5)",
1648 v.len()
1649 )));
1650 }
1651 Ok(v)
1652}
1653
1654#[cfg(test)]
1655mod tests {
1656 use super::*;
1657 use std::sync::atomic::{AtomicUsize, Ordering};
1658
1659 #[test]
1660 fn reassemble_ordered_restores_input_order() {
1661 let parts = vec![
1665 (2, vec![vec![2.0_f32], vec![2.1]]),
1666 (0, vec![vec![0.0], vec![0.1]]),
1667 (1, vec![vec![1.0], vec![1.1]]),
1668 ];
1669 let out = reassemble_ordered(parts);
1670 assert_eq!(
1671 out,
1672 vec![
1673 vec![0.0_f32],
1674 vec![0.1],
1675 vec![1.0],
1676 vec![1.1],
1677 vec![2.0],
1678 vec![2.1],
1679 ]
1680 );
1681 }
1682
1683 #[test]
1684 fn f32_to_bytes_roundtrip() {
1685 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1686 let bytes = f32_to_bytes(&input);
1687 assert_eq!(bytes.len(), input.len() * 4);
1688 let out = bytes_to_f32(&bytes);
1689 assert_eq!(out, input);
1690 }
1691
1692 #[test]
1693 fn validate_dim_rejects_divergent_vectors() {
1694 let dim = crate::constants::embedding_dim();
1697 let long = vec![0.0; dim + 10];
1698 assert!(validate_dim(long).is_err(), "longer vector must error");
1699 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1700 assert!(validate_dim(short).is_err(), "shorter vector must error");
1701 let exact = vec![0.0; dim];
1702 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1703 }
1704
1705 #[test]
1706 fn embedding_dim_matches_constants_source() {
1707 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1708 }
1709
1710 #[test]
1711 fn build_batches_preserves_global_indices() {
1712 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1713 let batches = build_batches(&texts, 4);
1714 assert_eq!(batches.len(), 3);
1715 assert_eq!(batches[0].len(), 4);
1716 assert_eq!(batches[2].len(), 2);
1717 assert_eq!(batches[2][1].0, 9);
1718 assert_eq!(batches[2][1].1, "t9");
1719 }
1720
1721 #[test]
1722 fn effective_permits_clamps_to_bounds() {
1723 assert!(effective_permits(0) >= 1);
1724 assert!(effective_permits(1000) <= 32);
1725 }
1726
1727 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1728 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1729 }
1730
1731 fn dummy_vec(dim: usize) -> Vec<f32> {
1732 vec![0.0; dim]
1733 }
1734
1735 #[test]
1738 fn concurrency_peak_never_exceeds_permits() {
1739 let permits = 4usize;
1740 let batches = test_batches(permits * 10);
1741 let dim = crate::constants::embedding_dim();
1742 let current = Arc::new(AtomicUsize::new(0));
1743 let peak = Arc::new(AtomicUsize::new(0));
1744
1745 let current_c = Arc::clone(¤t);
1746 let peak_c = Arc::clone(&peak);
1747 let work = move |batch: Vec<(usize, String)>| {
1748 let current = Arc::clone(¤t_c);
1749 let peak = Arc::clone(&peak_c);
1750 async move {
1751 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1752 peak.fetch_max(now, Ordering::SeqCst);
1753 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1754 current.fetch_sub(1, Ordering::SeqCst);
1755 Ok(batch
1756 .into_iter()
1757 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1758 .collect())
1759 }
1760 };
1761
1762 let mut delivered = 0usize;
1763 let rt = tokio::runtime::Builder::new_multi_thread()
1764 .worker_threads(4)
1765 .enable_all()
1766 .build()
1767 .expect("test runtime");
1768 rt.block_on(run_bounded(
1769 batches,
1770 permits,
1771 dim,
1772 CancellationToken::new(),
1773 work,
1774 &mut |_idx, _v| {
1775 delivered += 1;
1776 Ok(())
1777 },
1778 ))
1779 .expect("fan-out must succeed");
1780
1781 assert_eq!(delivered, permits * 10, "every item must be delivered");
1782 assert!(
1783 peak.load(Ordering::SeqCst) <= permits,
1784 "peak concurrency {} exceeded permits {permits}",
1785 peak.load(Ordering::SeqCst)
1786 );
1787 }
1788
1789 #[test]
1792 fn panicking_task_returns_permit_and_surfaces_error() {
1793 let permits = 2usize;
1794 let batches = test_batches(4);
1795 let dim = crate::constants::embedding_dim();
1796
1797 let work = move |batch: Vec<(usize, String)>| async move {
1798 if batch[0].0 == 1 {
1799 panic!("intentional test panic");
1800 }
1801 Ok(batch
1802 .into_iter()
1803 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1804 .collect())
1805 };
1806
1807 let rt = tokio::runtime::Builder::new_multi_thread()
1808 .worker_threads(2)
1809 .enable_all()
1810 .build()
1811 .expect("test runtime");
1812 let result = rt.block_on(run_bounded(
1813 batches,
1814 permits,
1815 dim,
1816 CancellationToken::new(),
1817 work,
1818 &mut |_idx, _v| Ok(()),
1819 ));
1820
1821 let err = result.expect_err("panic must surface as an error");
1822 assert!(
1823 err.to_string().contains("panicked"),
1824 "error must mention the panic: {err}"
1825 );
1826 }
1827
1828 #[test]
1831 fn cancellation_terminates_fan_out_quickly() {
1832 let permits = 2usize;
1833 let batches = test_batches(8);
1834 let dim = crate::constants::embedding_dim();
1835 let token = CancellationToken::new();
1836
1837 let work = move |batch: Vec<(usize, String)>| async move {
1838 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1840 Ok(batch
1841 .into_iter()
1842 .map(|(i, _)| (i, dummy_vec(crate::constants::embedding_dim())))
1843 .collect())
1844 };
1845
1846 let rt = tokio::runtime::Builder::new_multi_thread()
1847 .worker_threads(2)
1848 .enable_all()
1849 .build()
1850 .expect("test runtime");
1851 let cancel = token.clone();
1852 let start = std::time::Instant::now();
1853 let result = rt.block_on(async move {
1854 tokio::spawn(async move {
1855 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1856 cancel.cancel();
1857 });
1858 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1859 });
1860
1861 assert!(result.is_err(), "cancelled fan-out must report an error");
1862 assert!(
1863 start.elapsed() < std::time::Duration::from_secs(10),
1864 "graceful shutdown must finish well under the work duration"
1865 );
1866 }
1867
1868 #[test]
1871 fn fan_out_rejects_divergent_dim() {
1872 let permits = 2usize;
1873 let batches = test_batches(2);
1874 let dim = crate::constants::embedding_dim();
1875
1876 let work = move |batch: Vec<(usize, String)>| async move {
1877 Ok(batch
1878 .into_iter()
1879 .map(|(i, _)| (i, vec![0.0f32; 3]))
1880 .collect::<Vec<(usize, Vec<f32>)>>())
1881 };
1882
1883 let rt = tokio::runtime::Builder::new_multi_thread()
1884 .worker_threads(2)
1885 .enable_all()
1886 .build()
1887 .expect("test runtime");
1888 let result = rt.block_on(run_bounded(
1889 batches,
1890 permits,
1891 dim,
1892 CancellationToken::new(),
1893 work,
1894 &mut |_idx, _v| Ok(()),
1895 ));
1896
1897 let err = result.expect_err("divergent dim must fail the fan-out");
1898 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1899 }
1900
1901 #[test]
1903 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1904 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1905 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1906 }
1907
1908 #[test]
1910 fn adaptive_batch_dim384_shrinks() {
1911 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1912 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1913 }
1914
1915 #[test]
1917 fn adaptive_batch_intermediate_dims() {
1918 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1919 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1920 }
1921
1922 #[test]
1924 fn adaptive_batch_small_dim_clamps_to_base() {
1925 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1926 }
1927
1928 #[test]
1930 fn adaptive_batch_total_function() {
1931 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1932 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1933 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1934 }
1935
1936 #[test]
1938 #[serial_test::serial(env)]
1939 fn adaptive_wrappers_follow_env_dim() {
1940 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1941 let chunk = chunk_embed_batch_size();
1942 let entity = entity_embed_batch_size();
1943 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1944 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1945 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1946 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1947 }
1948
1949 #[test]
1957 fn embedding_error_kind_classify_oauth_message() {
1958 assert_eq!(
1959 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1960 EmbeddingErrorKind::OAuth,
1961 );
1962 assert_eq!(
1963 EmbeddingErrorKind::classify("oauth authentication failed"),
1964 EmbeddingErrorKind::OAuth,
1965 );
1966 }
1967
1968 #[test]
1971 fn embedding_error_kind_classify_quota_message() {
1972 assert_eq!(
1973 EmbeddingErrorKind::classify("quota exhausted on backend"),
1974 EmbeddingErrorKind::Quota,
1975 );
1976 assert_eq!(
1977 EmbeddingErrorKind::classify("Usage quota limit reached"),
1978 EmbeddingErrorKind::Quota,
1979 );
1980 }
1981
1982 #[test]
1986 fn embedding_error_kind_classify_slot_exhausted_message() {
1987 assert_eq!(
1988 EmbeddingErrorKind::classify(
1989 "slot exhausted: failed to acquire LLM slot after backoff"
1990 ),
1991 EmbeddingErrorKind::SlotExhausted,
1992 );
1993 }
1994
1995 #[test]
1998 fn embedding_error_kind_classify_zero_dimension_message() {
1999 assert_eq!(
2000 EmbeddingErrorKind::classify("embedding returned dim=zero"),
2001 EmbeddingErrorKind::ZeroDimension,
2002 );
2003 assert_eq!(
2004 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
2005 EmbeddingErrorKind::ZeroDimension,
2006 );
2007 }
2008
2009 #[test]
2013 fn embedding_error_kind_classify_unknown_fallback() {
2014 assert_eq!(
2015 EmbeddingErrorKind::classify("unrelated subprocess error"),
2016 EmbeddingErrorKind::Unknown,
2017 );
2018 assert_eq!(
2019 EmbeddingErrorKind::classify("rate limit hit"),
2020 EmbeddingErrorKind::Unknown,
2021 );
2022 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
2024 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
2025 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
2026 assert_eq!(
2027 EmbeddingErrorKind::BackendMismatch.code(),
2028 "backend-mismatch"
2029 );
2030 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
2031 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
2032 }
2033
2034 #[test]
2036 fn fallback_reason_display_does_not_panic() {
2037 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
2038 let _ = FallbackReason::Cancelled.to_string();
2039 let _ = FallbackReason::Timeout {
2040 operation: "embed_query".into(),
2041 duration_secs: 30,
2042 }
2043 .to_string();
2044 }
2045
2046 #[test]
2049 fn fallback_reason_is_partial_eq() {
2050 assert_eq!(
2051 FallbackReason::EmbeddingFailed("a".into()),
2052 FallbackReason::EmbeddingFailed("a".into())
2053 );
2054 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
2055 assert_ne!(
2056 FallbackReason::EmbeddingFailed("a".into()),
2057 FallbackReason::EmbeddingFailed("b".into())
2058 );
2059 assert_ne!(
2060 FallbackReason::Cancelled,
2061 FallbackReason::Timeout {
2062 operation: "x".into(),
2063 duration_secs: 1
2064 }
2065 );
2066 }
2067
2068 #[test]
2071 fn fallback_reason_timeout_preserves_fields() {
2072 let r = FallbackReason::Timeout {
2073 operation: "embed_query_local".into(),
2074 duration_secs: 300,
2075 };
2076 match r {
2077 FallbackReason::Timeout {
2078 operation,
2079 duration_secs,
2080 } => {
2081 assert_eq!(operation, "embed_query_local");
2082 assert_eq!(duration_secs, 300);
2083 }
2084 other => panic!("expected Timeout, got {other:?}"),
2085 }
2086 }
2087
2088 #[test]
2094 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
2095 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
2096 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
2099 let result = try_embed_query_with_fallback(bogus, "hello world");
2100 match result {
2101 Err(FallbackReason::EmbeddingFailed(msg)) => {
2102 assert!(!msg.is_empty(), "fallback message must not be empty");
2104 }
2105 Err(FallbackReason::Cancelled) => {
2106 panic!("expected EmbeddingFailed, got Cancelled");
2107 }
2108 Err(FallbackReason::Timeout { .. }) => {
2109 panic!("expected EmbeddingFailed, got Timeout");
2110 }
2111 Err(FallbackReason::SlotExhausted) => {
2112 panic!("expected EmbeddingFailed, got SlotExhausted");
2113 }
2114 Err(FallbackReason::OAuthQuota { .. }) => {
2115 panic!("expected EmbeddingFailed, got OAuthQuota");
2116 }
2117 Err(FallbackReason::BackendMismatch { .. }) => {
2118 panic!("expected EmbeddingFailed, got BackendMismatch");
2119 }
2120 Err(FallbackReason::DimZero) => {
2121 panic!("expected EmbeddingFailed, got DimZero");
2122 }
2123 Ok(_) => {
2124 panic!("expected an error, got Ok — embedder must fail for bogus path");
2125 }
2126 }
2127 }
2128
2129 #[test]
2131 fn g56_entity_cache_key_is_stable_and_distinct() {
2132 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2133 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2134 let k3 = entity_cache_key("codex:default", "claude-code");
2135 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2136 assert_eq!(k1, k2, "same model+text must hash identically");
2137 assert_ne!(k1, k3, "different text must hash differently");
2138 assert_ne!(k1, k4, "different model must hash differently");
2139 }
2140
2141 #[test]
2142 fn g56_entity_embed_cache_stats_hit_rate() {
2143 let zero = EmbedCacheStats::default();
2144 assert_eq!(zero.hit_rate(), 0.0);
2145 let half = EmbedCacheStats {
2146 requested: 4,
2147 hits: 2,
2148 misses: 2,
2149 };
2150 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2151 let all = EmbedCacheStats {
2152 requested: 7,
2153 hits: 7,
2154 misses: 0,
2155 };
2156 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2157 }
2158
2159 #[test]
2160 fn g56_entity_embed_cache_populates_and_hits() {
2161 let cache = entity_embed_cache();
2165 let model = "test-model";
2166 let text = "sqlite-graphrag";
2167 let key = entity_cache_key(model, text);
2168 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2169 cache.lock().insert(key, Arc::clone(&stored));
2170 let guard = cache.lock();
2171 let hit = guard.get(&key).expect("cache must return stored value");
2172 assert_eq!(hit.len(), crate::constants::embedding_dim());
2173 assert!((hit[0] - 0.42).abs() < 1e-6);
2174 }
2175
2176 #[test]
2177 fn g56_empty_texts_short_circuits_with_zero_stats() {
2178 let stats = EmbedCacheStats::default();
2181 assert_eq!(stats.requested, 0);
2182 assert_eq!(stats.hits, 0);
2183 assert_eq!(stats.misses, 0);
2184 assert_eq!(stats.hit_rate(), 0.0);
2185 }
2186}
2187
2188#[cfg(test)]
2192mod embed_with_fallback_tests {
2193 use super::*;
2194 use crate::llm::exit_code_hints::LlmBackendError;
2195
2196 #[test]
2197 fn none_backend_returns_empty_vector_without_calling_llm() {
2198 let (v, kind) = embed_via_backend(
2202 std::path::Path::new("/nonexistent"),
2203 "any text",
2204 &LlmBackendKind::None,
2205 )
2206 .expect("None backend never fails");
2207 assert!(v.is_empty());
2208 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2209 }
2210
2211 #[test]
2212 fn empty_chain_defaults_to_codex_claude_none() {
2213 let defaults = [
2217 LlmBackendKind::Codex,
2218 LlmBackendKind::Claude,
2219 LlmBackendKind::None,
2220 ];
2221
2222 #[allow(dead_code)]
2227 fn llm_backend_kind_as_str_is_stable() {
2228 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2229 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2230 assert_eq!(LlmBackendKind::None.as_str(), "none");
2231 }
2232
2233 #[allow(dead_code)]
2234 fn fallback_reason_reason_code_is_stable() {
2235 assert_eq!(
2236 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2237 "embedding_failed"
2238 );
2239 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2240 assert_eq!(
2241 FallbackReason::Timeout {
2242 operation: "embed_query".into(),
2243 duration_secs: 30
2244 }
2245 .reason_code(),
2246 "timeout"
2247 );
2248 }
2249 assert_eq!(defaults.len(), 3);
2250 }
2251
2252 #[test]
2253 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2254 let chain = vec![LlmBackendKind::None];
2266 let err = embed_with_fallback(
2267 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2268 "hello",
2269 &chain,
2270 false,
2271 )
2272 .expect_err("chain of only [None] without skip_on_failure MUST abort");
2273 let msg = format!("{err}");
2274 assert!(
2275 msg.contains("no LLM backends available"),
2276 "error must mention exhausted chain, got: {msg}"
2277 );
2278 }
2279 #[test]
2280 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2281 let chain = vec![LlmBackendKind::None];
2286 let v = embed_with_fallback(
2287 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2288 "hello",
2289 &chain,
2290 true,
2291 )
2292 .expect("None chain is always Ok");
2293 assert!(v.0.is_empty(), "vector must be empty");
2294 assert_eq!(v.1, LlmBackendKind::None);
2295 }
2296 #[allow(dead_code)]
2297 fn llm_backend_error_no_backends_default_message() {
2298 let e = LlmBackendError::NoBackendsAvailable;
2301 let h = e.hint();
2302 assert!(h.contains("--llm-fallback"));
2303 }
2304
2305 #[test]
2306 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2307 let e = LlmBackendError::NonZeroExit {
2308 exit_code: Some(137),
2309 signal: Some(9),
2310 stdout_tail: "out".into(),
2311 stderr_tail: "OOM killed".into(),
2312 binary: "codex".into(),
2313 hint: "OOM".into(),
2314 };
2315 let s = e.to_string();
2316 assert!(s.contains("codex"));
2317 assert!(s.contains("OOM killed"));
2318 assert!(s.contains("signal 9") || s.contains("exit 137"));
2319 }
2320}