1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62static OPENROUTER_CHAT_CLIENT: OnceLock<crate::chat_api::OpenRouterChatClient> = OnceLock::new();
66
67pub fn is_openrouter_initialized() -> bool {
69 OPENROUTER_CLIENT.get().is_some()
70}
71static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
72
73static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
79
80pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
84
85pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
89
90pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
92
93fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
101 let base = base.max(1);
102 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
103}
104
105pub fn chunk_embed_batch_size() -> usize {
107 let dim = crate::constants::embedding_dim();
108 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
109 tracing::debug!(
110 dim,
111 base = CHUNK_EMBED_BATCH_SIZE,
112 batch,
113 "adaptive chunk batch size (G44)"
114 );
115 batch
116}
117
118pub fn entity_embed_batch_size() -> usize {
120 let dim = crate::constants::embedding_dim();
121 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
122 tracing::debug!(
123 dim,
124 base = ENTITY_EMBED_BATCH_SIZE,
125 batch,
126 "adaptive entity batch size (G44)"
127 );
128 batch
129}
130
131pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
133 if let Some(rt) = RUNTIME.get() {
134 return Ok(rt);
135 }
136 let rt = tokio::runtime::Builder::new_multi_thread()
137 .worker_threads(2)
138 .enable_all()
139 .build()
140 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
141 let _ = RUNTIME.set(rt);
142 RUNTIME.get().ok_or_else(|| {
143 AppError::Embedding("tokio runtime unavailable after initialisation".to_string())
144 })
145}
146
147pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
149 if let Some(e) = EMBEDDER.get() {
150 return Ok(e);
151 }
152 let backend = LlmEmbedding::detect_available()?;
153 let _ = EMBEDDER.set(Mutex::new(backend));
154 EMBEDDER
155 .get()
156 .ok_or_else(|| AppError::Embedding("embedder unavailable after initialisation".to_string()))
157}
158
159pub fn get_claude_embedder(
164 claude_binary: Option<&Path>,
165 claude_model: Option<&str>,
166) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
167 if let Some(e) = CLAUDE_EMBEDDER.get() {
168 return Ok(e);
169 }
170 let mut builder = LlmEmbedding::with_claude_builder();
171 if let Some(b) = claude_binary {
172 builder = builder.override_binary(b.to_path_buf());
173 }
174 if let Some(m) = claude_model {
175 builder = builder.override_model(m.to_string());
176 }
177 let backend = builder.build()?;
178 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
179 CLAUDE_EMBEDDER.get().ok_or_else(|| {
180 AppError::Embedding("claude embedder unavailable after initialisation".to_string())
181 })
182}
183
184pub fn get_opencode_embedder(
189 opencode_binary: Option<&Path>,
190 opencode_model: Option<&str>,
191) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
192 if let Some(e) = OPENCODE_EMBEDDER.get() {
193 return Ok(e);
194 }
195 let mut builder = LlmEmbedding::with_opencode_builder();
196 if let Some(b) = opencode_binary {
197 builder = builder.override_binary(b.to_path_buf());
198 }
199 if let Some(m) = opencode_model {
200 builder = builder.override_model(m.to_string());
201 }
202 let backend = builder.build()?;
203 let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
204 OPENCODE_EMBEDDER.get().ok_or_else(|| {
205 AppError::Embedding("opencode embedder unavailable after initialisation".to_string())
206 })
207}
208
209pub fn get_openrouter_embedder(
210 api_key: secrecy::SecretBox<String>,
211 model: &str,
212 dim: usize,
213) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
214 if let Some(c) = OPENROUTER_CLIENT.get() {
215 return Ok(c);
216 }
217 let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
218 let _ = OPENROUTER_CLIENT.set(client);
219 OPENROUTER_CLIENT.get().ok_or_else(|| {
220 AppError::Embedding("openrouter client unavailable after initialisation".to_string())
221 })
222}
223
224pub fn get_openrouter_chat_client(
228 api_key: secrecy::SecretBox<String>,
229 model: &str,
230 timeout_secs: u64,
231) -> Result<&'static crate::chat_api::OpenRouterChatClient, AppError> {
232 if let Some(c) = OPENROUTER_CHAT_CLIENT.get() {
233 return Ok(c);
234 }
235 let client =
236 crate::chat_api::OpenRouterChatClient::new(api_key, model.to_string(), timeout_secs)?;
237 let _ = OPENROUTER_CHAT_CLIENT.set(client);
238 OPENROUTER_CHAT_CLIENT.get().ok_or_else(|| {
239 AppError::Embedding("openrouter chat client unavailable after initialisation".to_string())
240 })
241}
242
243pub fn openrouter_chat_client() -> Option<&'static crate::chat_api::OpenRouterChatClient> {
248 OPENROUTER_CHAT_CLIENT.get()
249}
250
251pub fn embed_via_claude_local(
255 _models_dir: &Path,
256 text: &str,
257 claude_binary: Option<&Path>,
258 claude_model: Option<&str>,
259) -> Result<Vec<f32>, AppError> {
260 let _slot_guard = acquire_llm_slot_for_embedding()?;
261 let embedder = get_claude_embedder(claude_binary, claude_model)?;
262 embed_passage(embedder, text)
263}
264
265pub fn embed_via_claude_local_resolved(
270 _models_dir: &Path,
271 text: &str,
272 claude_binary: Option<&Path>,
273 claude_model: Option<&str>,
274) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
275 let _slot_guard = acquire_llm_slot_for_embedding()?;
276 let embedder = get_claude_embedder(claude_binary, claude_model)?;
277 let v = embed_passage(embedder, text)?;
278 Ok((v, LlmBackendKind::Claude))
279}
280
281pub fn embed_via_opencode_local_resolved(
286 _models_dir: &Path,
287 text: &str,
288 opencode_binary: Option<&Path>,
289 opencode_model: Option<&str>,
290) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
291 let _slot_guard = acquire_llm_slot_for_embedding()?;
292 let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
293 let v = embed_passage(embedder, text)?;
294 Ok((v, LlmBackendKind::Opencode))
295}
296fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
299 embedder.lock().clone()
300}
301
302pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
306 let client = clone_client(embedder);
307 let result = client.embed_passage(text)?;
308 validate_dim(result)
309}
310
311pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
315 let client = clone_client(embedder);
316 let result = client.embed_query(text)?;
317 validate_dim(result)
318}
319
320pub fn embed_passages_controlled(
325 embedder: &Mutex<LlmEmbedding>,
326 texts: &[&str],
327 _token_counts: &[usize],
328) -> Result<Vec<Vec<f32>>, AppError> {
329 if texts.is_empty() {
330 return Ok(Vec::new());
331 }
332 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
333 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
334}
335
336pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
337 let _slot_guard = acquire_llm_slot_for_embedding()?;
338 let embedder = get_embedder(models_dir)?;
339 embed_passage(embedder, text)
340}
341
342pub fn should_skip_embedding_on_failure() -> bool {
346 matches!(
347 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
348 Ok("1") | Ok("true")
349 )
350}
351
352pub fn embed_passage_or_skip(
359 models_dir: &Path,
360 text: &str,
361 choice: Option<crate::cli::LlmBackendChoice>,
362) -> Result<Option<Vec<f32>>, AppError> {
363 match embed_passage_with_choice(models_dir, text, choice) {
364 Ok((v, _backend)) => Ok(Some(v)),
365 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
366 Err(e) => {
367 if should_skip_embedding_on_failure() {
368 tracing::warn!(
369 error = %e,
370 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
371 );
372 Ok(None)
373 } else {
374 Err(e)
375 }
376 }
377 }
378}
379
380pub fn embed_passage_local_resolved(
386 models_dir: &Path,
387 text: &str,
388) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
389 let _slot_guard = acquire_llm_slot_for_embedding()?;
390 let embedder = get_embedder(models_dir)?;
391 let v = embed_passage(embedder, text)?;
392 let kind = match embedder.lock().flavour() {
393 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
394 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
395 crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
396 };
397 Ok((v, kind))
398}
399
400pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
401 let _slot_guard = acquire_llm_slot_for_embedding()?;
402 let embedder = get_embedder(models_dir)?;
403 embed_query(embedder, text)
404}
405
406pub fn embed_passage_with_choice(
423 models_dir: &Path,
424 text: &str,
425 choice: Option<crate::cli::LlmBackendChoice>,
426) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
427 let _slot_guard = acquire_llm_slot_for_embedding()?;
428 match choice {
429 None => {
430 let embedder = get_embedder(models_dir)?;
431 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
432 }
433 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
434 }
435}
436
437pub fn embed_passage_with_embedding_choice(
441 models_dir: &Path,
442 text: &str,
443 embedding_backend: crate::cli::EmbeddingBackendChoice,
444 llm_backend: crate::cli::LlmBackendChoice,
445) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
446 let _slot_guard = acquire_llm_slot_for_embedding()?;
447 let chain = embedding_backend.to_chain(llm_backend);
448 embed_with_fallback(models_dir, text, &chain, false)
449}
450
451pub fn try_embed_query_with_choice(
457 models_dir: &Path,
458 text: &str,
459 choice: Option<crate::cli::LlmBackendChoice>,
460) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
461 match embed_passage_with_choice(models_dir, text, choice) {
462 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
475 Ok((v, backend)) => Ok((v, backend)),
476 Err(e) => Err(classify_embedding_error(e)),
477 }
478}
479pub fn try_embed_query_with_embedding_choice(
484 models_dir: &Path,
485 text: &str,
486 embedding_backend: crate::cli::EmbeddingBackendChoice,
487 llm_backend: crate::cli::LlmBackendChoice,
488) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
489 match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
490 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
491 Ok((v, backend)) => Ok((v, backend)),
492 Err(e) => Err(classify_embedding_error(e)),
493 }
494}
495
496fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
508 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
509 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
510 .ok()
511 .and_then(|s| s.parse::<u32>().ok())
512 .filter(|n| *n >= 1)
513 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
514 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
515 0
516 } else {
517 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
518 .ok()
519 .and_then(|s| s.parse::<u64>().ok())
520 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
521 };
522 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
530 Ok(guard) => Ok(guard),
531 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
532 "slot exhausted: {e} (fall back to FTS5)"
533 ))),
534 Err(e) => Err(e),
535 }
536}
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549pub enum EmbeddingErrorKind {
550 OAuth,
552 Quota,
554 SlotExhausted,
556 BackendMismatch,
558 ZeroDimension,
560 Unknown,
562}
563
564impl EmbeddingErrorKind {
565 pub fn classify(msg: &str) -> Self {
574 let m = msg.to_lowercase();
575 if m.contains("oauth") {
576 Self::OAuth
577 } else if m.contains("quota") {
578 Self::Quota
579 } else if m.contains("slot exhausted") {
580 Self::SlotExhausted
581 } else if m.contains("backend mismatch") {
582 Self::BackendMismatch
583 } else if m.contains("dim") && m.contains("zero") {
584 Self::ZeroDimension
585 } else {
586 Self::Unknown
587 }
588 }
589
590 pub fn code(&self) -> &'static str {
592 match self {
593 Self::OAuth => "oauth",
594 Self::Quota => "quota",
595 Self::SlotExhausted => "slot-exhausted",
596 Self::BackendMismatch => "backend-mismatch",
597 Self::ZeroDimension => "zero-dimension",
598 Self::Unknown => "unknown",
599 }
600 }
601}
602
603impl std::fmt::Display for EmbeddingErrorKind {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 f.write_str(self.code())
606 }
607}
608
609#[derive(Debug, Clone, PartialEq)]
616pub enum FallbackReason {
617 EmbeddingFailed(String),
621 SlotExhausted,
626 OAuthQuota { backend: &'static str },
630 BackendMismatch {
634 requested: &'static str,
635 resolved: &'static str,
636 },
637 DimZero,
642 Cancelled,
644 Timeout {
647 operation: String,
648 duration_secs: u64,
649 },
650}
651
652impl FallbackReason {
653 pub fn reason_code(&self) -> &'static str {
657 match self {
658 Self::EmbeddingFailed(_) => "embedding_failed",
659 Self::SlotExhausted => "slot_exhausted",
660 Self::OAuthQuota { .. } => "oauth_quota",
661 Self::BackendMismatch { .. } => "backend_mismatch",
662 Self::DimZero => "dim_zero",
663 Self::Cancelled => "cancelled",
664 Self::Timeout { .. } => "timeout",
665 }
666 }
667}
668
669impl std::fmt::Display for FallbackReason {
670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671 match self {
672 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
673 Self::SlotExhausted => write!(
674 f,
675 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
676 ),
677 Self::OAuthQuota { backend } => {
678 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
679 }
680 Self::BackendMismatch {
681 requested,
682 resolved,
683 } => {
684 write!(
685 f,
686 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
687 )
688 }
689 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
690 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
691 Self::Timeout {
692 operation,
693 duration_secs,
694 } => {
695 write!(
696 f,
697 "embedding timed out after {duration_secs}s during {operation}"
698 )
699 }
700 }
701 }
702}
703
704impl std::error::Error for FallbackReason {}
705
706pub fn try_embed_query_with_fallback(
714 models_dir: &Path,
715 query: &str,
716) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
717 match embed_query_local(models_dir, query) {
718 Ok(v) => Ok((v, LlmBackendKind::None)),
719 Err(e) => Err(classify_embedding_error(e)),
720 }
721}
722
723pub fn try_embed_query_with_deterministic_fallback(
732 models_dir: &Path,
733 query: &str,
734 choice: Option<crate::cli::LlmBackendChoice>,
735) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
736 match try_embed_query_with_choice(models_dir, query, choice) {
737 Ok(t) => Ok(t),
738 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
739 let alt = match backend {
740 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
741 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
742 "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
743 "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
744 _ => None,
745 };
746 if let Some(alt_choice) = alt {
747 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
748 } else {
749 Err(reason)
750 }
751 }
752 Err(reason @ FallbackReason::SlotExhausted) => {
753 std::thread::sleep(std::time::Duration::from_millis(750));
754 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
755 }
756 Err(other) => Err(other),
757 }
758}
759
760pub fn classify_embedding_error(err: AppError) -> FallbackReason {
768 match err {
769 AppError::Timeout {
770 operation,
771 duration_secs,
772 } => FallbackReason::Timeout {
773 operation,
774 duration_secs,
775 },
776 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
777 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
786 EmbeddingErrorKind::OAuth => {
787 let backend = if msg.contains("codex") {
788 "codex"
789 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
790 "claude"
795 } else if msg.contains("opencode") {
796 "opencode"
797 } else {
798 "unknown"
799 };
800 FallbackReason::OAuthQuota { backend }
801 }
802 EmbeddingErrorKind::Quota => {
803 let backend = if msg.contains("codex") {
804 "codex"
805 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
806 "claude"
807 } else if msg.contains("opencode") {
808 "opencode"
809 } else {
810 "unknown"
811 };
812 FallbackReason::OAuthQuota { backend }
813 }
814 EmbeddingErrorKind::BackendMismatch => {
815 let (requested, resolved) =
820 if msg.contains("requested claude") && msg.contains("but codex") {
821 ("claude", "codex")
822 } else if msg.contains("requested codex") && msg.contains("but claude") {
823 ("codex", "claude")
824 } else if msg.contains("requested claude") {
825 ("claude", "unknown")
826 } else if msg.contains("requested codex") {
827 ("codex", "unknown")
828 } else {
829 ("unknown", "unknown")
830 };
831 FallbackReason::BackendMismatch {
832 requested,
833 resolved,
834 }
835 }
836 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
837 EmbeddingErrorKind::Unknown => {
838 if msg.contains("cancelled") {
839 FallbackReason::Cancelled
840 } else {
841 FallbackReason::EmbeddingFailed(msg)
842 }
843 }
844 },
845 e => FallbackReason::EmbeddingFailed(e.to_string()),
846 }
847}
848pub fn embed_with_fallback(
867 models_dir: &Path,
868 text: &str,
869 chain: &[LlmBackendKind],
870 skip_on_failure: bool,
871) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
872 use crate::llm::exit_code_hints::LlmBackendError;
873 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
874 vec![
875 LlmBackendKind::Codex,
876 LlmBackendKind::Claude,
877 LlmBackendKind::Opencode,
878 LlmBackendKind::None,
879 ]
880 } else {
881 chain.to_vec()
882 };
883
884 let mut last_err: Option<AppError> = None;
885 for backend in &effective {
886 match embed_via_backend_strict(
897 models_dir,
898 text,
899 backend,
900 last_err.as_ref(),
901 skip_on_failure,
902 ) {
903 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
904 Err(e) => {
905 if matches!(e, AppError::Validation(_)) {
910 return Err(e);
911 }
912 tracing::warn!(
913 target: "embedding",
914 backend = ?backend,
915 error = %e,
916 "embed_with_fallback: backend failed, trying next"
917 );
918 last_err = Some(e);
919 }
920 }
921 }
922 if skip_on_failure {
923 return Ok((Vec::new(), LlmBackendKind::None));
928 }
929 Err(last_err
930 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
931}
932
933#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
937pub enum LlmBackendKind {
938 Codex,
940 Claude,
942 Opencode,
944 OpenRouter,
946 None,
948}
949
950impl LlmBackendKind {
951 pub fn as_str(self) -> &'static str {
954 match self {
955 Self::Codex => "codex",
956 Self::Claude => "claude",
957 Self::Opencode => "opencode",
958 Self::OpenRouter => "openrouter",
959 Self::None => "none",
960 }
961 }
962}
963
964pub fn embed_via_backend(
979 models_dir: &Path,
980 text: &str,
981 backend: &LlmBackendKind,
982) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
983 match backend {
984 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
985 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
986 LlmBackendKind::Claude => {
987 tracing::debug!(
991 target: "embedder",
992 backend = "claude",
993 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
994 );
995 embed_via_claude_local_resolved(models_dir, text, None, None)
996 }
997 LlmBackendKind::Opencode => {
998 tracing::debug!(
999 target: "embedder",
1000 backend = "opencode",
1001 "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
1002 );
1003 embed_via_opencode_local_resolved(models_dir, text, None, None)
1004 }
1005 LlmBackendKind::OpenRouter => {
1006 tracing::debug!(
1007 target: "embedder",
1008 backend = "openrouter",
1009 "embed_via_backend: using OpenRouter API (v1.0.93)"
1010 );
1011 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1012 AppError::Embedding(
1013 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1014 )
1015 })?;
1016 let rt = shared_runtime()?;
1017 let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
1018 Ok((vec, LlmBackendKind::OpenRouter))
1019 }
1020 }
1021}
1022
1023pub fn embed_via_backend_strict(
1036 models_dir: &Path,
1037 text: &str,
1038 backend: &LlmBackendKind,
1039 last_err: Option<&AppError>,
1040 skip_on_failure: bool,
1041) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1042 use crate::llm::exit_code_hints::LlmBackendError;
1043 match backend {
1044 LlmBackendKind::None => {
1045 if skip_on_failure && last_err.is_none() {
1049 Ok((Vec::new(), LlmBackendKind::None))
1050 } else if last_err.is_some() {
1051 Err(match last_err {
1055 Some(e) => AppError::Embedding(format!("{e}")),
1056 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1057 })
1058 } else {
1059 Err(AppError::Embedding(
1062 LlmBackendError::NoBackendsAvailable.to_string(),
1063 ))
1064 }
1065 }
1066 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1067 LlmBackendKind::Claude => {
1068 tracing::debug!(
1069 target: "embedder",
1070 backend = "claude",
1071 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1072 );
1073 embed_via_claude_local_resolved(models_dir, text, None, None)
1074 }
1075 LlmBackendKind::Opencode => {
1076 tracing::debug!(
1077 target: "embedder",
1078 backend = "opencode",
1079 "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1080 );
1081 embed_via_opencode_local_resolved(models_dir, text, None, None)
1082 }
1083 LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1084 }
1085}
1086
1087pub fn embed_via_backend_legacy(
1092 models_dir: &Path,
1093 text: &str,
1094 backend: &LlmBackendKind,
1095) -> Result<Vec<f32>, AppError> {
1096 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1097}
1098
1099pub fn embed_passages_controlled_local(
1100 models_dir: &Path,
1101 texts: &[&str],
1102 token_counts: &[usize],
1103) -> Result<Vec<Vec<f32>>, AppError> {
1104 let embedder = get_embedder(models_dir)?;
1105 embed_passages_controlled(embedder, texts, token_counts)
1106}
1107
1108pub fn embed_passages_parallel_local(
1111 models_dir: &Path,
1112 texts: &[String],
1113 parallelism: usize,
1114 batch_size: usize,
1115) -> Result<Vec<Vec<f32>>, AppError> {
1116 let embedder = get_embedder(models_dir)?;
1117 embed_texts_parallel(embedder, texts, parallelism, batch_size)
1118}
1119
1120type EmbedChunkResult = (usize, Result<Vec<Vec<f32>>, AppError>);
1124
1125fn reassemble_ordered(mut parts: Vec<(usize, Vec<Vec<f32>>)>) -> Vec<Vec<f32>> {
1130 parts.sort_by_key(|(idx, _)| *idx);
1131 parts.into_iter().flat_map(|(_, v)| v).collect()
1132}
1133
1134pub fn embed_passages_parallel_with_embedding_choice(
1141 models_dir: &Path,
1142 texts: &[String],
1143 parallelism: usize,
1144 batch_size: usize,
1145 embedding_backend: crate::cli::EmbeddingBackendChoice,
1146 llm_backend: crate::cli::LlmBackendChoice,
1147) -> Result<Vec<Vec<f32>>, AppError> {
1148 let chain = embedding_backend.to_chain(llm_backend);
1149 if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1150 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1151 AppError::Embedding(
1152 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1153 )
1154 })?;
1155 let rt = shared_runtime()?;
1156
1157 let k = parallelism.clamp(1, 16);
1162 if texts.len() <= 32 || k == 1 {
1163 let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1164 let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1165 return Ok(vecs);
1166 }
1167
1168 let vecs = rt.block_on(async move {
1173 let mut set: JoinSet<EmbedChunkResult> = JoinSet::new();
1174 let mut parts: Vec<(usize, Vec<Vec<f32>>)> = Vec::new();
1175
1176 for (idx, chunk) in texts.chunks(32).enumerate() {
1177 if set.len() >= k {
1178 if let Some(joined) = set.join_next().await {
1179 let (cidx, res) = joined.map_err(|e| {
1180 AppError::Embedding(format!("embedding task join error: {e}"))
1181 })?;
1182 parts.push((cidx, res?));
1183 }
1184 }
1185 let owned: Vec<String> = chunk.to_vec();
1186 set.spawn(async move {
1187 let refs: Vec<&str> = owned.iter().map(|s| s.as_str()).collect();
1188 let r = client
1192 .embed_batch(&refs, client.default_input_type())
1193 .await
1194 .map_err(AppError::from);
1195 (idx, r)
1196 });
1197 }
1198
1199 while let Some(joined) = set.join_next().await {
1200 let (cidx, res) = joined
1201 .map_err(|e| AppError::Embedding(format!("embedding task join error: {e}")))?;
1202 parts.push((cidx, res?));
1203 }
1204
1205 Ok::<Vec<Vec<f32>>, AppError>(reassemble_ordered(parts))
1206 })?;
1207 Ok(vecs)
1208 } else {
1209 embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1210 }
1211}
1212
1213type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1225
1226static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1227
1228fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1229 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1230}
1231
1232fn entity_cache_key(model: &str, text: &str) -> u64 {
1233 let mut hasher = blake3::Hasher::new();
1234 hasher.update(model.as_bytes());
1235 hasher.update(b"\0");
1236 hasher.update(text.as_bytes());
1237 let h = hasher.finalize();
1238 let bytes = h.as_bytes();
1239 u64::from_le_bytes([
1240 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1241 ])
1242}
1243
1244pub fn embed_entity_texts_cached(
1254 models_dir: &Path,
1255 texts: &[String],
1256 parallelism: usize,
1257 embedding_backend: crate::cli::EmbeddingBackendChoice,
1258 llm_backend: crate::cli::LlmBackendChoice,
1259) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1260 if texts.is_empty() {
1261 return Ok((Vec::new(), EmbedCacheStats::default()));
1262 }
1263 let chain = embedding_backend.to_chain(llm_backend);
1267
1268 if chain.as_slice() == [LlmBackendKind::None] {
1274 let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1275 return Ok((
1276 out,
1277 EmbedCacheStats {
1278 requested: texts.len(),
1279 hits: 0,
1280 misses: texts.len(),
1281 },
1282 ));
1283 }
1284
1285 let routed_openrouter =
1291 chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1292 let model = if routed_openrouter {
1293 format!("openrouter:{}", crate::constants::embedding_dim())
1294 } else {
1295 get_embedder(models_dir)?.lock().model_label()
1296 };
1297 let cache = entity_embed_cache();
1298 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1299 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1300 {
1301 let guard = cache.lock();
1302 for (i, text) in texts.iter().enumerate() {
1303 let key = entity_cache_key(&model, text);
1304 if let Some(v) = guard.get(&key) {
1305 hits[i] = Some(Arc::clone(v));
1306 } else {
1307 miss_indices.push(i);
1308 }
1309 }
1310 }
1311 let miss_count = miss_indices.len();
1312 if miss_count > 0 {
1313 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1314 let miss_vecs = embed_passages_parallel_with_embedding_choice(
1318 models_dir,
1319 &miss_texts,
1320 parallelism,
1321 entity_embed_batch_size(),
1322 embedding_backend,
1323 llm_backend,
1324 )?;
1325 let mut guard = cache.lock();
1326 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1327 let vec = Arc::new(miss_vecs[slot].clone());
1328 let key = entity_cache_key(&model, &texts[orig_idx]);
1329 guard.insert(key, Arc::clone(&vec));
1330 hits[orig_idx] = Some(vec);
1331 }
1332 }
1333 let mut out = Vec::with_capacity(texts.len());
1334 for hit in hits.into_iter() {
1335 let v = hit.ok_or_else(|| {
1336 AppError::Embedding("entity embed cache produced null result".to_string())
1337 })?;
1338 out.push((*v).clone());
1339 }
1340 Ok((
1341 out,
1342 EmbedCacheStats {
1343 requested: texts.len(),
1344 hits: texts.len() - miss_count,
1345 misses: miss_count,
1346 },
1347 ))
1348}
1349
1350#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1352pub struct EmbedCacheStats {
1353 pub requested: usize,
1354 pub hits: usize,
1355 pub misses: usize,
1356}
1357
1358impl EmbedCacheStats {
1359 pub fn hit_rate(&self) -> f64 {
1361 if self.requested == 0 {
1362 0.0
1363 } else {
1364 self.hits as f64 / self.requested as f64
1365 }
1366 }
1367}
1368
1369pub fn embed_texts_parallel(
1382 embedder: &Mutex<LlmEmbedding>,
1383 texts: &[String],
1384 parallelism: usize,
1385 batch_size: usize,
1386) -> Result<Vec<Vec<f32>>, AppError> {
1387 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1388 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1389 slots[idx] = Some(v.to_vec());
1390 Ok(())
1391 })?;
1392 let mut out = Vec::with_capacity(slots.len());
1393 for (idx, slot) in slots.into_iter().enumerate() {
1394 out.push(slot.ok_or_else(|| {
1395 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1396 })?);
1397 }
1398 Ok(out)
1399}
1400
1401pub fn embed_texts_parallel_with(
1405 embedder: &Mutex<LlmEmbedding>,
1406 texts: &[String],
1407 parallelism: usize,
1408 batch_size: usize,
1409 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1410) -> Result<(), AppError> {
1411 if texts.is_empty() {
1412 return Ok(());
1413 }
1414 let dim = crate::constants::embedding_dim();
1415 if texts.len() == 1 {
1416 let v = embed_passage(embedder, &texts[0])?;
1417 return on_result(0, &v);
1418 }
1419
1420 let client = clone_client(embedder);
1421 let permits = effective_permits(parallelism);
1422 let batches = build_batches(texts, batch_size.max(1));
1423 let token = crate::cancel_token().clone();
1424
1425 let work = move |batch: Vec<(usize, String)>| {
1426 let client = client.clone();
1427 async move {
1428 client
1429 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1430 .await
1431 }
1432 };
1433
1434 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1435 match tokio::runtime::Handle::try_current() {
1436 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1437 Err(_) => shared_runtime()?.block_on(fan_out),
1438 }
1439}
1440
1441fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1443 texts
1444 .iter()
1445 .cloned()
1446 .enumerate()
1447 .collect::<Vec<_>>()
1448 .chunks(batch_size)
1449 .map(|c| c.to_vec())
1450 .collect()
1451}
1452
1453pub fn effective_permits(requested: usize) -> usize {
1458 let cpus = std::thread::available_parallelism()
1459 .map(|n| n.get())
1460 .unwrap_or(4);
1461 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1462 / crate::constants::LLM_WORKER_RSS_MB)
1463 .max(1) as usize;
1464 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1465}
1466
1467async fn run_bounded<F, Fut>(
1477 batches: Vec<Vec<(usize, String)>>,
1478 permits: usize,
1479 dim: usize,
1480 token: CancellationToken,
1481 work: F,
1482 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1483) -> Result<(), AppError>
1484where
1485 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1486 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1487{
1488 let total_batches = batches.len();
1489 let semaphore = Arc::new(Semaphore::new(permits));
1490 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1493 let mut set: JoinSet<()> = JoinSet::new();
1494
1495 for (batch_idx, batch) in batches.into_iter().enumerate() {
1496 let sem = Arc::clone(&semaphore);
1497 let token = token.clone();
1498 let tx = tx.clone();
1499 let work = work.clone();
1500 set.spawn(async move {
1501 let wait_start = std::time::Instant::now();
1502 let Ok(_permit) = sem.acquire_owned().await else {
1505 let _ = tx
1506 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1507 .await;
1508 return;
1509 };
1510 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1511 let work_start = std::time::Instant::now();
1512 let outcome = if crate::should_obey_shutdown() {
1518 tokio::select! {
1519 res = work(batch) => res,
1520 _ = token.cancelled() => Err(AppError::Embedding(
1521 "embedding cancelled by shutdown signal".to_string(),
1522 )),
1523 }
1524 } else {
1525 work(batch).await
1526 };
1527 tracing::debug!(
1529 target: "embedding",
1530 batch_idx,
1531 permit_wait_ms,
1532 work_ms = work_start.elapsed().as_millis() as u64,
1533 ok = outcome.is_ok(),
1534 "embedding batch finished"
1535 );
1536 let _ = tx.send(outcome).await;
1537 });
1538 }
1539 drop(tx);
1540
1541 let mut completed = 0usize;
1542 let mut failed = 0usize;
1543 let mut cancelled = 0usize;
1544 let mut first_error: Option<AppError> = None;
1545
1546 while let Some(message) = rx.recv().await {
1547 match message {
1548 Ok(items) => {
1549 completed += 1;
1550 if first_error.is_none() {
1551 for (idx, v) in items {
1552 if v.len() != dim {
1553 first_error = Some(AppError::Embedding(format!(
1554 "LLM returned {} dims for item {idx}, expected {dim}; \
1555 refusing to truncate or pad silently (G42/C5)",
1556 v.len()
1557 )));
1558 break;
1559 }
1560 if let Err(e) = on_result(idx, &v) {
1561 first_error = Some(e);
1562 break;
1563 }
1564 }
1565 if first_error.is_some() {
1566 set.shutdown().await;
1569 }
1570 }
1571 }
1572 Err(e) => {
1573 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1574 cancelled += 1;
1575 } else {
1576 failed += 1;
1577 }
1578 if first_error.is_none() {
1579 first_error = Some(e);
1580 set.shutdown().await;
1581 }
1582 }
1583 }
1584 }
1585
1586 while let Some(join_result) = set.join_next().await {
1589 if let Err(join_err) = join_result {
1590 if join_err.is_panic() {
1591 failed += 1;
1592 if first_error.is_none() {
1593 first_error = Some(AppError::Embedding(format!(
1594 "embedding task panicked: {join_err}"
1595 )));
1596 }
1597 } else {
1598 cancelled += 1;
1599 }
1600 }
1601 }
1602
1603 tracing::debug!(
1613 target: "embedding",
1614 total_batches,
1615 completed,
1616 failed,
1617 cancelled,
1618 "embedding fan-out finished"
1619 );
1620
1621 match first_error {
1622 Some(e) => Err(e),
1623 None => Ok(()),
1624 }
1625}
1626
1627pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1628 let mut out = Vec::with_capacity(v.len() * 4);
1629 for f in v {
1630 out.extend_from_slice(&f.to_le_bytes());
1631 }
1632 out
1633}
1634
1635pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1636 let mut out = Vec::with_capacity(bytes.len() / 4);
1637 for chunk in bytes.chunks_exact(4) {
1638 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1639 }
1640 out
1641}
1642
1643pub fn embedding_dim() -> usize {
1646 crate::constants::embedding_dim()
1647}
1648
1649fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1653 let dim = crate::constants::embedding_dim();
1654 if v.len() != dim {
1655 return Err(AppError::Embedding(format!(
1656 "embedding has {} dims, expected {dim}; \
1657 refusing to truncate or pad silently (G42/C5)",
1658 v.len()
1659 )));
1660 }
1661 Ok(v)
1662}
1663
1664#[cfg(test)]
1665mod tests {
1666 use super::*;
1667 use std::sync::atomic::{AtomicUsize, Ordering};
1668
1669 #[test]
1670 fn reassemble_ordered_restores_input_order() {
1671 let parts = vec![
1675 (2, vec![vec![2.0_f32], vec![2.1]]),
1676 (0, vec![vec![0.0], vec![0.1]]),
1677 (1, vec![vec![1.0], vec![1.1]]),
1678 ];
1679 let out = reassemble_ordered(parts);
1680 assert_eq!(
1681 out,
1682 vec![
1683 vec![0.0_f32],
1684 vec![0.1],
1685 vec![1.0],
1686 vec![1.1],
1687 vec![2.0],
1688 vec![2.1],
1689 ]
1690 );
1691 }
1692
1693 #[test]
1694 fn f32_to_bytes_roundtrip() {
1695 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1696 let bytes = f32_to_bytes(&input);
1697 assert_eq!(bytes.len(), input.len() * 4);
1698 let out = bytes_to_f32(&bytes);
1699 assert_eq!(out, input);
1700 }
1701
1702 #[test]
1703 fn validate_dim_rejects_divergent_vectors() {
1704 let dim = crate::constants::embedding_dim();
1707 let long = vec![0.0; dim + 10];
1708 assert!(validate_dim(long).is_err(), "longer vector must error");
1709 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1710 assert!(validate_dim(short).is_err(), "shorter vector must error");
1711 let exact = vec![0.0; dim];
1712 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1713 }
1714
1715 #[test]
1716 fn embedding_dim_matches_constants_source() {
1717 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1718 }
1719
1720 #[test]
1721 fn build_batches_preserves_global_indices() {
1722 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1723 let batches = build_batches(&texts, 4);
1724 assert_eq!(batches.len(), 3);
1725 assert_eq!(batches[0].len(), 4);
1726 assert_eq!(batches[2].len(), 2);
1727 assert_eq!(batches[2][1].0, 9);
1728 assert_eq!(batches[2][1].1, "t9");
1729 }
1730
1731 #[test]
1732 fn effective_permits_clamps_to_bounds() {
1733 assert!(effective_permits(0) >= 1);
1734 assert!(effective_permits(1000) <= 32);
1735 }
1736
1737 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1738 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1739 }
1740
1741 fn dummy_vec(dim: usize) -> Vec<f32> {
1742 vec![0.0; dim]
1743 }
1744
1745 #[test]
1748 fn concurrency_peak_never_exceeds_permits() {
1749 let permits = 4usize;
1750 let batches = test_batches(permits * 10);
1751 let dim = crate::constants::embedding_dim();
1752 let current = Arc::new(AtomicUsize::new(0));
1753 let peak = Arc::new(AtomicUsize::new(0));
1754
1755 let current_c = Arc::clone(¤t);
1756 let peak_c = Arc::clone(&peak);
1757 let work = move |batch: Vec<(usize, String)>| {
1758 let current = Arc::clone(¤t_c);
1759 let peak = Arc::clone(&peak_c);
1760 async move {
1761 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1762 peak.fetch_max(now, Ordering::SeqCst);
1763 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1764 current.fetch_sub(1, Ordering::SeqCst);
1765 Ok(batch
1766 .into_iter()
1767 .map(|(i, _)| (i, dummy_vec(dim)))
1768 .collect())
1769 }
1770 };
1771
1772 let mut delivered = 0usize;
1773 let rt = tokio::runtime::Builder::new_multi_thread()
1774 .worker_threads(4)
1775 .enable_all()
1776 .build()
1777 .expect("test runtime");
1778 rt.block_on(run_bounded(
1779 batches,
1780 permits,
1781 dim,
1782 CancellationToken::new(),
1783 work,
1784 &mut |_idx, _v| {
1785 delivered += 1;
1786 Ok(())
1787 },
1788 ))
1789 .expect("fan-out must succeed");
1790
1791 assert_eq!(delivered, permits * 10, "every item must be delivered");
1792 assert!(
1793 peak.load(Ordering::SeqCst) <= permits,
1794 "peak concurrency {} exceeded permits {permits}",
1795 peak.load(Ordering::SeqCst)
1796 );
1797 }
1798
1799 #[test]
1802 fn panicking_task_returns_permit_and_surfaces_error() {
1803 let permits = 2usize;
1804 let batches = test_batches(4);
1805 let dim = crate::constants::embedding_dim();
1806
1807 let work = move |batch: Vec<(usize, String)>| async move {
1808 if batch[0].0 == 1 {
1809 panic!("intentional test panic");
1810 }
1811 Ok(batch
1812 .into_iter()
1813 .map(|(i, _)| (i, dummy_vec(dim)))
1814 .collect())
1815 };
1816
1817 let rt = tokio::runtime::Builder::new_multi_thread()
1818 .worker_threads(2)
1819 .enable_all()
1820 .build()
1821 .expect("test runtime");
1822 let result = rt.block_on(run_bounded(
1823 batches,
1824 permits,
1825 dim,
1826 CancellationToken::new(),
1827 work,
1828 &mut |_idx, _v| Ok(()),
1829 ));
1830
1831 let err = result.expect_err("panic must surface as an error");
1832 assert!(
1833 err.to_string().contains("panicked"),
1834 "error must mention the panic: {err}"
1835 );
1836 }
1837
1838 #[test]
1841 fn cancellation_terminates_fan_out_quickly() {
1842 let permits = 2usize;
1843 let batches = test_batches(8);
1844 let dim = crate::constants::embedding_dim();
1845 let token = CancellationToken::new();
1846
1847 let work = move |batch: Vec<(usize, String)>| async move {
1848 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1850 Ok(batch
1851 .into_iter()
1852 .map(|(i, _)| (i, dummy_vec(dim)))
1853 .collect())
1854 };
1855
1856 let rt = tokio::runtime::Builder::new_multi_thread()
1857 .worker_threads(2)
1858 .enable_all()
1859 .build()
1860 .expect("test runtime");
1861 let cancel = token.clone();
1862 let start = std::time::Instant::now();
1863 let result = rt.block_on(async move {
1864 tokio::spawn(async move {
1865 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1866 cancel.cancel();
1867 });
1868 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1869 });
1870
1871 assert!(result.is_err(), "cancelled fan-out must report an error");
1872 assert!(
1873 start.elapsed() < std::time::Duration::from_secs(10),
1874 "graceful shutdown must finish well under the work duration"
1875 );
1876 }
1877
1878 #[test]
1881 fn fan_out_rejects_divergent_dim() {
1882 let permits = 2usize;
1883 let batches = test_batches(2);
1884 let dim = crate::constants::embedding_dim();
1885
1886 let work = move |batch: Vec<(usize, String)>| async move {
1887 Ok(batch
1888 .into_iter()
1889 .map(|(i, _)| (i, vec![0.0f32; 3]))
1890 .collect::<Vec<(usize, Vec<f32>)>>())
1891 };
1892
1893 let rt = tokio::runtime::Builder::new_multi_thread()
1894 .worker_threads(2)
1895 .enable_all()
1896 .build()
1897 .expect("test runtime");
1898 let result = rt.block_on(run_bounded(
1899 batches,
1900 permits,
1901 dim,
1902 CancellationToken::new(),
1903 work,
1904 &mut |_idx, _v| Ok(()),
1905 ));
1906
1907 let err = result.expect_err("divergent dim must fail the fan-out");
1908 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1909 }
1910
1911 #[test]
1913 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1914 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1915 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1916 }
1917
1918 #[test]
1920 fn adaptive_batch_dim384_shrinks() {
1921 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1922 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1923 }
1924
1925 #[test]
1927 fn adaptive_batch_intermediate_dims() {
1928 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1929 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1930 }
1931
1932 #[test]
1934 fn adaptive_batch_small_dim_clamps_to_base() {
1935 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1936 }
1937
1938 #[test]
1940 fn adaptive_batch_total_function() {
1941 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1942 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1943 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1944 }
1945
1946 #[test]
1948 #[serial_test::serial(env)]
1949 fn adaptive_wrappers_follow_env_dim() {
1950 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1951 let chunk = chunk_embed_batch_size();
1952 let entity = entity_embed_batch_size();
1953 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1954 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1955 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1956 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1957 }
1958
1959 #[test]
1967 fn embedding_error_kind_classify_oauth_message() {
1968 assert_eq!(
1969 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1970 EmbeddingErrorKind::OAuth,
1971 );
1972 assert_eq!(
1973 EmbeddingErrorKind::classify("oauth authentication failed"),
1974 EmbeddingErrorKind::OAuth,
1975 );
1976 }
1977
1978 #[test]
1981 fn embedding_error_kind_classify_quota_message() {
1982 assert_eq!(
1983 EmbeddingErrorKind::classify("quota exhausted on backend"),
1984 EmbeddingErrorKind::Quota,
1985 );
1986 assert_eq!(
1987 EmbeddingErrorKind::classify("Usage quota limit reached"),
1988 EmbeddingErrorKind::Quota,
1989 );
1990 }
1991
1992 #[test]
1996 fn embedding_error_kind_classify_slot_exhausted_message() {
1997 assert_eq!(
1998 EmbeddingErrorKind::classify(
1999 "slot exhausted: failed to acquire LLM slot after backoff"
2000 ),
2001 EmbeddingErrorKind::SlotExhausted,
2002 );
2003 }
2004
2005 #[test]
2008 fn embedding_error_kind_classify_zero_dimension_message() {
2009 assert_eq!(
2010 EmbeddingErrorKind::classify("embedding returned dim=zero"),
2011 EmbeddingErrorKind::ZeroDimension,
2012 );
2013 assert_eq!(
2014 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
2015 EmbeddingErrorKind::ZeroDimension,
2016 );
2017 }
2018
2019 #[test]
2023 fn embedding_error_kind_classify_unknown_fallback() {
2024 assert_eq!(
2025 EmbeddingErrorKind::classify("unrelated subprocess error"),
2026 EmbeddingErrorKind::Unknown,
2027 );
2028 assert_eq!(
2029 EmbeddingErrorKind::classify("rate limit hit"),
2030 EmbeddingErrorKind::Unknown,
2031 );
2032 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
2034 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
2035 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
2036 assert_eq!(
2037 EmbeddingErrorKind::BackendMismatch.code(),
2038 "backend-mismatch"
2039 );
2040 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
2041 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
2042 }
2043
2044 #[test]
2046 fn fallback_reason_display_does_not_panic() {
2047 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
2048 let _ = FallbackReason::Cancelled.to_string();
2049 let _ = FallbackReason::Timeout {
2050 operation: "embed_query".into(),
2051 duration_secs: 30,
2052 }
2053 .to_string();
2054 }
2055
2056 #[test]
2059 fn fallback_reason_is_partial_eq() {
2060 assert_eq!(
2061 FallbackReason::EmbeddingFailed("a".into()),
2062 FallbackReason::EmbeddingFailed("a".into())
2063 );
2064 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
2065 assert_ne!(
2066 FallbackReason::EmbeddingFailed("a".into()),
2067 FallbackReason::EmbeddingFailed("b".into())
2068 );
2069 assert_ne!(
2070 FallbackReason::Cancelled,
2071 FallbackReason::Timeout {
2072 operation: "x".into(),
2073 duration_secs: 1
2074 }
2075 );
2076 }
2077
2078 #[test]
2081 fn fallback_reason_timeout_preserves_fields() {
2082 let r = FallbackReason::Timeout {
2083 operation: "embed_query_local".into(),
2084 duration_secs: 300,
2085 };
2086 match r {
2087 FallbackReason::Timeout {
2088 operation,
2089 duration_secs,
2090 } => {
2091 assert_eq!(operation, "embed_query_local");
2092 assert_eq!(duration_secs, 300);
2093 }
2094 other => panic!("expected Timeout, got {other:?}"),
2095 }
2096 }
2097
2098 #[test]
2104 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
2105 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
2106 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
2109 let result = try_embed_query_with_fallback(bogus, "hello world");
2110 match result {
2111 Err(FallbackReason::EmbeddingFailed(msg)) => {
2112 assert!(!msg.is_empty(), "fallback message must not be empty");
2114 }
2115 Err(FallbackReason::Cancelled) => {
2116 panic!("expected EmbeddingFailed, got Cancelled");
2117 }
2118 Err(FallbackReason::Timeout { .. }) => {
2119 panic!("expected EmbeddingFailed, got Timeout");
2120 }
2121 Err(FallbackReason::SlotExhausted) => {
2122 panic!("expected EmbeddingFailed, got SlotExhausted");
2123 }
2124 Err(FallbackReason::OAuthQuota { .. }) => {
2125 panic!("expected EmbeddingFailed, got OAuthQuota");
2126 }
2127 Err(FallbackReason::BackendMismatch { .. }) => {
2128 panic!("expected EmbeddingFailed, got BackendMismatch");
2129 }
2130 Err(FallbackReason::DimZero) => {
2131 panic!("expected EmbeddingFailed, got DimZero");
2132 }
2133 Ok(_) => {
2134 panic!("expected an error, got Ok — embedder must fail for bogus path");
2135 }
2136 }
2137 }
2138
2139 #[test]
2141 fn g56_entity_cache_key_is_stable_and_distinct() {
2142 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2143 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2144 let k3 = entity_cache_key("codex:default", "claude-code");
2145 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2146 assert_eq!(k1, k2, "same model+text must hash identically");
2147 assert_ne!(k1, k3, "different text must hash differently");
2148 assert_ne!(k1, k4, "different model must hash differently");
2149 }
2150
2151 #[test]
2152 fn g56_entity_embed_cache_stats_hit_rate() {
2153 let zero = EmbedCacheStats::default();
2154 assert_eq!(zero.hit_rate(), 0.0);
2155 let half = EmbedCacheStats {
2156 requested: 4,
2157 hits: 2,
2158 misses: 2,
2159 };
2160 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2161 let all = EmbedCacheStats {
2162 requested: 7,
2163 hits: 7,
2164 misses: 0,
2165 };
2166 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2167 }
2168
2169 #[test]
2170 fn g56_entity_embed_cache_populates_and_hits() {
2171 let cache = entity_embed_cache();
2175 let model = "test-model";
2176 let text = "sqlite-graphrag";
2177 let key = entity_cache_key(model, text);
2178 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2179 cache.lock().insert(key, Arc::clone(&stored));
2180 let guard = cache.lock();
2181 let hit = guard.get(&key).expect("cache must return stored value");
2182 assert_eq!(hit.len(), crate::constants::embedding_dim());
2183 assert!((hit[0] - 0.42).abs() < 1e-6);
2184 }
2185
2186 #[test]
2187 fn g56_empty_texts_short_circuits_with_zero_stats() {
2188 let stats = EmbedCacheStats::default();
2191 assert_eq!(stats.requested, 0);
2192 assert_eq!(stats.hits, 0);
2193 assert_eq!(stats.misses, 0);
2194 assert_eq!(stats.hit_rate(), 0.0);
2195 }
2196}
2197
2198#[cfg(test)]
2202mod embed_with_fallback_tests {
2203 use super::*;
2204 use crate::llm::exit_code_hints::LlmBackendError;
2205
2206 #[test]
2207 fn none_backend_returns_empty_vector_without_calling_llm() {
2208 let (v, kind) = embed_via_backend(
2212 std::path::Path::new("/nonexistent"),
2213 "any text",
2214 &LlmBackendKind::None,
2215 )
2216 .expect("None backend never fails");
2217 assert!(v.is_empty());
2218 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2219 }
2220
2221 #[test]
2222 fn empty_chain_defaults_to_codex_claude_none() {
2223 let defaults = [
2227 LlmBackendKind::Codex,
2228 LlmBackendKind::Claude,
2229 LlmBackendKind::None,
2230 ];
2231
2232 #[allow(dead_code)]
2237 fn llm_backend_kind_as_str_is_stable() {
2238 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2239 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2240 assert_eq!(LlmBackendKind::None.as_str(), "none");
2241 }
2242
2243 #[allow(dead_code)]
2244 fn fallback_reason_reason_code_is_stable() {
2245 assert_eq!(
2246 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2247 "embedding_failed"
2248 );
2249 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2250 assert_eq!(
2251 FallbackReason::Timeout {
2252 operation: "embed_query".into(),
2253 duration_secs: 30
2254 }
2255 .reason_code(),
2256 "timeout"
2257 );
2258 }
2259 assert_eq!(defaults.len(), 3);
2260 }
2261
2262 #[test]
2263 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2264 let chain = vec![LlmBackendKind::None];
2276 let err = embed_with_fallback(
2277 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2278 "hello",
2279 &chain,
2280 false,
2281 )
2282 .expect_err("chain of only [None] without skip_on_failure MUST abort");
2283 let msg = format!("{err}");
2284 assert!(
2285 msg.contains("no LLM backends available"),
2286 "error must mention exhausted chain, got: {msg}"
2287 );
2288 }
2289 #[test]
2290 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2291 let chain = vec![LlmBackendKind::None];
2296 let v = embed_with_fallback(
2297 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2298 "hello",
2299 &chain,
2300 true,
2301 )
2302 .expect("None chain is always Ok");
2303 assert!(v.0.is_empty(), "vector must be empty");
2304 assert_eq!(v.1, LlmBackendKind::None);
2305 }
2306 #[allow(dead_code)]
2307 fn llm_backend_error_no_backends_default_message() {
2308 let e = LlmBackendError::NoBackendsAvailable;
2311 let h = e.hint();
2312 assert!(h.contains("--llm-fallback"));
2313 }
2314
2315 #[test]
2316 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2317 let e = LlmBackendError::NonZeroExit {
2318 exit_code: Some(137),
2319 signal: Some(9),
2320 stdout_tail: "out".into(),
2321 stderr_tail: "OOM killed".into(),
2322 binary: "codex".into(),
2323 hint: "OOM".into(),
2324 };
2325 let s = e.to_string();
2326 assert!(s.contains("codex"));
2327 assert!(s.contains("OOM killed"));
2328 assert!(s.contains("signal 9") || s.contains("exit 137"));
2329 }
2330}