1use crate::errors::AppError;
40use crate::extract::llm_embedding::LlmEmbedding;
41use parking_lot::Mutex;
42use std::path::Path;
43use std::sync::Arc;
44use std::sync::OnceLock;
45use tokio::sync::{mpsc, Semaphore};
46use tokio::task::JoinSet;
47use tokio_util::sync::CancellationToken;
48
49static CLAUDE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
59static OPENCODE_EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
60static OPENROUTER_CLIENT: OnceLock<crate::embedding_api::OpenRouterClient> = OnceLock::new();
61
62static OPENROUTER_CHAT_CLIENT: OnceLock<crate::chat_api::OpenRouterChatClient> = OnceLock::new();
66
67pub fn is_openrouter_initialized() -> bool {
69 OPENROUTER_CLIENT.get().is_some()
70}
71static EMBEDDER: OnceLock<Mutex<LlmEmbedding>> = OnceLock::new();
72
73static RUNTIME: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
79
80pub const CHUNK_EMBED_BATCH_SIZE: usize = 8;
84
85pub const ENTITY_EMBED_BATCH_SIZE: usize = 25;
89
90pub const EMBED_BATCH_CALIBRATION_DIM: usize = 64;
92
93fn adaptive_batch_for_dim(base: usize, dim: usize) -> usize {
101 let base = base.max(1);
102 (base * EMBED_BATCH_CALIBRATION_DIM / dim.max(1)).clamp(1, base)
103}
104
105pub fn chunk_embed_batch_size() -> usize {
107 let dim = crate::constants::embedding_dim();
108 let batch = adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, dim);
109 tracing::debug!(
110 dim,
111 base = CHUNK_EMBED_BATCH_SIZE,
112 batch,
113 "adaptive chunk batch size (G44)"
114 );
115 batch
116}
117
118pub fn entity_embed_batch_size() -> usize {
120 let dim = crate::constants::embedding_dim();
121 let batch = adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, dim);
122 tracing::debug!(
123 dim,
124 base = ENTITY_EMBED_BATCH_SIZE,
125 batch,
126 "adaptive entity batch size (G44)"
127 );
128 batch
129}
130
131pub(crate) fn shared_runtime() -> Result<&'static tokio::runtime::Runtime, AppError> {
133 if let Some(rt) = RUNTIME.get() {
134 return Ok(rt);
135 }
136 let rt = tokio::runtime::Builder::new_multi_thread()
137 .worker_threads(2)
138 .enable_all()
139 .build()
140 .map_err(|e| AppError::Embedding(format!("tokio runtime init failed: {e}")))?;
141 let _ = RUNTIME.set(rt);
142 RUNTIME.get().ok_or_else(|| {
143 AppError::Embedding("tokio runtime unavailable after initialisation".to_string())
144 })
145}
146
147pub fn get_embedder(_models_dir: &Path) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
149 if let Some(e) = EMBEDDER.get() {
150 return Ok(e);
151 }
152 let backend = LlmEmbedding::detect_available()?;
153 let _ = EMBEDDER.set(Mutex::new(backend));
154 EMBEDDER
155 .get()
156 .ok_or_else(|| AppError::Embedding("embedder unavailable after initialisation".to_string()))
157}
158
159pub fn get_claude_embedder(
164 claude_binary: Option<&Path>,
165 claude_model: Option<&str>,
166) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
167 if let Some(e) = CLAUDE_EMBEDDER.get() {
168 return Ok(e);
169 }
170 let mut builder = LlmEmbedding::with_claude_builder();
171 if let Some(b) = claude_binary {
172 builder = builder.override_binary(b.to_path_buf());
173 }
174 if let Some(m) = claude_model {
175 builder = builder.override_model(m.to_string());
176 }
177 let backend = builder.build()?;
178 let _ = CLAUDE_EMBEDDER.set(Mutex::new(backend));
179 CLAUDE_EMBEDDER.get().ok_or_else(|| {
180 AppError::Embedding("claude embedder unavailable after initialisation".to_string())
181 })
182}
183
184pub fn get_opencode_embedder(
189 opencode_binary: Option<&Path>,
190 opencode_model: Option<&str>,
191) -> Result<&'static Mutex<LlmEmbedding>, AppError> {
192 if let Some(e) = OPENCODE_EMBEDDER.get() {
193 return Ok(e);
194 }
195 let mut builder = LlmEmbedding::with_opencode_builder();
196 if let Some(b) = opencode_binary {
197 builder = builder.override_binary(b.to_path_buf());
198 }
199 if let Some(m) = opencode_model {
200 builder = builder.override_model(m.to_string());
201 }
202 let backend = builder.build()?;
203 let _ = OPENCODE_EMBEDDER.set(Mutex::new(backend));
204 OPENCODE_EMBEDDER.get().ok_or_else(|| {
205 AppError::Embedding("opencode embedder unavailable after initialisation".to_string())
206 })
207}
208
209pub fn get_openrouter_embedder(
210 api_key: secrecy::SecretBox<String>,
211 model: &str,
212 dim: usize,
213) -> Result<&'static crate::embedding_api::OpenRouterClient, AppError> {
214 if let Some(c) = OPENROUTER_CLIENT.get() {
215 return Ok(c);
216 }
217 let client = crate::embedding_api::OpenRouterClient::new(api_key, model.to_string(), dim)?;
218 let _ = OPENROUTER_CLIENT.set(client);
219 OPENROUTER_CLIENT.get().ok_or_else(|| {
220 AppError::Embedding("openrouter client unavailable after initialisation".to_string())
221 })
222}
223
224pub fn get_openrouter_chat_client(
228 api_key: secrecy::SecretBox<String>,
229 model: &str,
230 timeout_secs: u64,
231) -> Result<&'static crate::chat_api::OpenRouterChatClient, AppError> {
232 if let Some(c) = OPENROUTER_CHAT_CLIENT.get() {
233 return Ok(c);
234 }
235 let client =
236 crate::chat_api::OpenRouterChatClient::new(api_key, model.to_string(), timeout_secs)?;
237 let _ = OPENROUTER_CHAT_CLIENT.set(client);
238 OPENROUTER_CHAT_CLIENT.get().ok_or_else(|| {
239 AppError::Embedding("openrouter chat client unavailable after initialisation".to_string())
240 })
241}
242
243pub fn openrouter_chat_client() -> Option<&'static crate::chat_api::OpenRouterChatClient> {
248 OPENROUTER_CHAT_CLIENT.get()
249}
250
251pub fn embed_via_claude_local(
255 _models_dir: &Path,
256 text: &str,
257 claude_binary: Option<&Path>,
258 claude_model: Option<&str>,
259) -> Result<Vec<f32>, AppError> {
260 let _slot_guard = acquire_llm_slot_for_embedding()?;
261 let embedder = get_claude_embedder(claude_binary, claude_model)?;
262 embed_passage(embedder, text)
263}
264
265pub fn embed_via_claude_local_resolved(
270 _models_dir: &Path,
271 text: &str,
272 claude_binary: Option<&Path>,
273 claude_model: Option<&str>,
274) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
275 let _slot_guard = acquire_llm_slot_for_embedding()?;
276 let embedder = get_claude_embedder(claude_binary, claude_model)?;
277 let v = embed_passage(embedder, text)?;
278 Ok((v, LlmBackendKind::Claude))
279}
280
281pub fn embed_via_opencode_local_resolved(
286 _models_dir: &Path,
287 text: &str,
288 opencode_binary: Option<&Path>,
289 opencode_model: Option<&str>,
290) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
291 let _slot_guard = acquire_llm_slot_for_embedding()?;
292 let embedder = get_opencode_embedder(opencode_binary, opencode_model)?;
293 let v = embed_passage(embedder, text)?;
294 Ok((v, LlmBackendKind::Opencode))
295}
296fn clone_client(embedder: &Mutex<LlmEmbedding>) -> LlmEmbedding {
299 embedder.lock().clone()
300}
301
302pub fn embed_passage(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
306 let client = clone_client(embedder);
307 let result = client.embed_passage(text)?;
308 validate_dim(result)
309}
310
311pub fn embed_query(embedder: &Mutex<LlmEmbedding>, text: &str) -> Result<Vec<f32>, AppError> {
315 let client = clone_client(embedder);
316 let result = client.embed_query(text)?;
317 validate_dim(result)
318}
319
320pub fn embed_passages_controlled(
325 embedder: &Mutex<LlmEmbedding>,
326 texts: &[&str],
327 _token_counts: &[usize],
328) -> Result<Vec<Vec<f32>>, AppError> {
329 if texts.is_empty() {
330 return Ok(Vec::new());
331 }
332 let owned: Vec<String> = texts.iter().map(|t| t.to_string()).collect();
333 embed_texts_parallel(embedder, &owned, 1, chunk_embed_batch_size())
334}
335
336pub fn embed_passage_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
337 let _slot_guard = acquire_llm_slot_for_embedding()?;
338 let embedder = get_embedder(models_dir)?;
339 embed_passage(embedder, text)
340}
341
342pub fn should_skip_embedding_on_failure() -> bool {
346 matches!(
347 std::env::var("SQLITE_GRAPHRAG_SKIP_EMBEDDING_ON_FAILURE").as_deref(),
348 Ok("1") | Ok("true")
349 )
350}
351
352pub fn embed_passage_or_skip(
359 models_dir: &Path,
360 text: &str,
361 choice: Option<crate::cli::LlmBackendChoice>,
362) -> Result<Option<Vec<f32>>, AppError> {
363 match embed_passage_with_choice(models_dir, text, choice) {
364 Ok((v, _backend)) => Ok(Some(v)),
365 Err(AppError::Validation(msg)) => Err(AppError::Validation(msg)),
366 Err(e) => {
367 if should_skip_embedding_on_failure() {
368 tracing::warn!(
369 error = %e,
370 "embedding failed but --skip-embedding-on-failure is active; persisting with NULL embedding"
371 );
372 Ok(None)
373 } else {
374 Err(e)
375 }
376 }
377 }
378}
379
380pub fn embed_passage_local_resolved(
386 models_dir: &Path,
387 text: &str,
388) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
389 let _slot_guard = acquire_llm_slot_for_embedding()?;
390 let embedder = get_embedder(models_dir)?;
391 let v = embed_passage(embedder, text)?;
392 let kind = match embedder.lock().flavour() {
393 crate::extract::llm_embedding::EmbeddingFlavour::Codex => LlmBackendKind::Codex,
394 crate::extract::llm_embedding::EmbeddingFlavour::Claude => LlmBackendKind::Claude,
395 crate::extract::llm_embedding::EmbeddingFlavour::Opencode => LlmBackendKind::Opencode,
396 };
397 Ok((v, kind))
398}
399
400pub fn embed_query_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
401 let _slot_guard = acquire_llm_slot_for_embedding()?;
402 let embedder = get_embedder(models_dir)?;
403 embed_query(embedder, text)
404}
405
406pub fn embed_passage_with_choice(
423 models_dir: &Path,
424 text: &str,
425 choice: Option<crate::cli::LlmBackendChoice>,
426) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
427 let _slot_guard = acquire_llm_slot_for_embedding()?;
428 match choice {
429 None => {
430 let embedder = get_embedder(models_dir)?;
431 embed_passage(embedder, text).map(|v| (v, LlmBackendKind::None))
432 }
433 Some(choice) => embed_with_fallback(models_dir, text, &choice.to_chain(), false),
434 }
435}
436
437pub fn embed_passage_with_embedding_choice(
441 models_dir: &Path,
442 text: &str,
443 embedding_backend: crate::cli::EmbeddingBackendChoice,
444 llm_backend: crate::cli::LlmBackendChoice,
445) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
446 let _slot_guard = acquire_llm_slot_for_embedding()?;
447 let chain = embedding_backend.to_chain(llm_backend);
448 embed_with_fallback(models_dir, text, &chain, false)
449}
450
451pub fn try_embed_query_with_choice(
457 models_dir: &Path,
458 text: &str,
459 choice: Option<crate::cli::LlmBackendChoice>,
460) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
461 match embed_passage_with_choice(models_dir, text, choice) {
462 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
475 Ok((v, backend)) => Ok((v, backend)),
476 Err(e) => Err(classify_embedding_error(e)),
477 }
478}
479pub fn try_embed_query_with_embedding_choice(
484 models_dir: &Path,
485 text: &str,
486 embedding_backend: crate::cli::EmbeddingBackendChoice,
487 llm_backend: crate::cli::LlmBackendChoice,
488) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
489 match embed_passage_with_embedding_choice(models_dir, text, embedding_backend, llm_backend) {
490 Ok((v, _backend)) if v.is_empty() => Err(FallbackReason::DimZero),
491 Ok((v, backend)) => Ok((v, backend)),
492 Err(e) => Err(classify_embedding_error(e)),
493 }
494}
495
496fn acquire_llm_slot_for_embedding() -> Result<crate::llm_slots::LlmSlotGuard, AppError> {
508 use crate::constants::{CLI_LOCK_DEFAULT_WAIT_SECS, LLM_WORKER_RSS_MB};
509 let max = std::env::var("SQLITE_GRAPHRAG_LLM_MAX_HOST_CONCURRENCY")
510 .ok()
511 .and_then(|s| s.parse::<u32>().ok())
512 .filter(|n| *n >= 1)
513 .unwrap_or_else(crate::llm_slots::default_max_concurrency);
514 let wait_secs = if std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_NO_WAIT").is_ok() {
515 0
516 } else {
517 std::env::var("SQLITE_GRAPHRAG_LLM_SLOT_WAIT_SECS")
518 .ok()
519 .and_then(|s| s.parse::<u64>().ok())
520 .unwrap_or(CLI_LOCK_DEFAULT_WAIT_SECS)
521 };
522 let _ = LLM_WORKER_RSS_MB; match crate::llm_slots::acquire_llm_slot(max, wait_secs) {
530 Ok(guard) => Ok(guard),
531 Err(e @ AppError::LockBusy { .. }) if wait_secs > 0 => Err(AppError::Embedding(format!(
532 "slot exhausted: {e} (fall back to FTS5)"
533 ))),
534 Err(e) => Err(e),
535 }
536}
537#[derive(Debug, Clone, Copy, PartialEq, Eq)]
549pub enum EmbeddingErrorKind {
550 OAuth,
552 Quota,
554 SlotExhausted,
556 BackendMismatch,
558 ZeroDimension,
560 Unknown,
562}
563
564impl EmbeddingErrorKind {
565 pub fn classify(msg: &str) -> Self {
574 let m = msg.to_lowercase();
575 if m.contains("oauth") {
576 Self::OAuth
577 } else if m.contains("quota") {
578 Self::Quota
579 } else if m.contains("slot exhausted") {
580 Self::SlotExhausted
581 } else if m.contains("backend mismatch") {
582 Self::BackendMismatch
583 } else if m.contains("dim") && m.contains("zero") {
584 Self::ZeroDimension
585 } else {
586 Self::Unknown
587 }
588 }
589
590 pub fn code(&self) -> &'static str {
592 match self {
593 Self::OAuth => "oauth",
594 Self::Quota => "quota",
595 Self::SlotExhausted => "slot-exhausted",
596 Self::BackendMismatch => "backend-mismatch",
597 Self::ZeroDimension => "zero-dimension",
598 Self::Unknown => "unknown",
599 }
600 }
601}
602
603impl std::fmt::Display for EmbeddingErrorKind {
604 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
605 f.write_str(self.code())
606 }
607}
608
609#[derive(Debug, Clone, PartialEq)]
616pub enum FallbackReason {
617 EmbeddingFailed(String),
621 SlotExhausted,
626 OAuthQuota { backend: &'static str },
630 BackendMismatch {
634 requested: &'static str,
635 resolved: &'static str,
636 },
637 DimZero,
642 Cancelled,
644 Timeout {
647 operation: String,
648 duration_secs: u64,
649 },
650}
651
652impl FallbackReason {
653 pub fn reason_code(&self) -> &'static str {
657 match self {
658 Self::EmbeddingFailed(_) => "embedding_failed",
659 Self::SlotExhausted => "slot_exhausted",
660 Self::OAuthQuota { .. } => "oauth_quota",
661 Self::BackendMismatch { .. } => "backend_mismatch",
662 Self::DimZero => "dim_zero",
663 Self::Cancelled => "cancelled",
664 Self::Timeout { .. } => "timeout",
665 }
666 }
667}
668
669impl std::fmt::Display for FallbackReason {
670 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
671 match self {
672 Self::EmbeddingFailed(msg) => write!(f, "embedding failed: {msg}"),
673 Self::SlotExhausted => write!(
674 f,
675 "slot exhausted: failed to acquire LLM slot after backoff window (max=8 concurrent, total backoff=750ms)"
676 ),
677 Self::OAuthQuota { backend } => {
678 write!(f, "OAuth usage quota exhausted on backend '{backend}'")
679 }
680 Self::BackendMismatch {
681 requested,
682 resolved,
683 } => {
684 write!(
685 f,
686 "backend mismatch: user requested '{requested}' but '{resolved}' was invoked"
687 )
688 }
689 Self::DimZero => write!(f, "embedding returned zero-dimensional vector"),
690 Self::Cancelled => write!(f, "embedding cancelled by external signal"),
691 Self::Timeout {
692 operation,
693 duration_secs,
694 } => {
695 write!(
696 f,
697 "embedding timed out after {duration_secs}s during {operation}"
698 )
699 }
700 }
701 }
702}
703
704impl std::error::Error for FallbackReason {}
705
706pub fn try_embed_query_with_fallback(
714 models_dir: &Path,
715 query: &str,
716) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
717 match embed_query_local(models_dir, query) {
718 Ok(v) => Ok((v, LlmBackendKind::None)),
719 Err(e) => Err(classify_embedding_error(e)),
720 }
721}
722
723pub fn try_embed_query_with_deterministic_fallback(
732 models_dir: &Path,
733 query: &str,
734 choice: Option<crate::cli::LlmBackendChoice>,
735) -> Result<(Vec<f32>, LlmBackendKind), FallbackReason> {
736 match try_embed_query_with_choice(models_dir, query, choice) {
737 Ok(t) => Ok(t),
738 Err(reason @ FallbackReason::OAuthQuota { backend }) => {
739 let alt = match backend {
740 "codex" => Some(crate::cli::LlmBackendChoice::Claude),
741 "claude" => Some(crate::cli::LlmBackendChoice::Codex),
742 "opencode" => Some(crate::cli::LlmBackendChoice::Codex),
743 "openrouter" => Some(crate::cli::LlmBackendChoice::Codex),
744 _ => None,
745 };
746 if let Some(alt_choice) = alt {
747 try_embed_query_with_choice(models_dir, query, Some(alt_choice))
748 } else {
749 Err(reason)
750 }
751 }
752 Err(reason @ FallbackReason::SlotExhausted) => {
753 std::thread::sleep(std::time::Duration::from_millis(750));
754 try_embed_query_with_choice(models_dir, query, choice).or(Err(reason))
755 }
756 Err(other) => Err(other),
757 }
758}
759
760pub fn classify_embedding_error(err: AppError) -> FallbackReason {
768 match err {
769 AppError::Timeout {
770 operation,
771 duration_secs,
772 } => FallbackReason::Timeout {
773 operation,
774 duration_secs,
775 },
776 AppError::Embedding(msg) => match EmbeddingErrorKind::classify(&msg) {
777 EmbeddingErrorKind::SlotExhausted => FallbackReason::SlotExhausted,
786 EmbeddingErrorKind::OAuth => {
787 let backend = if msg.contains("codex") {
788 "codex"
789 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
790 "claude"
795 } else if msg.contains("opencode") {
796 "opencode"
797 } else {
798 "unknown"
799 };
800 FallbackReason::OAuthQuota { backend }
801 }
802 EmbeddingErrorKind::Quota => {
803 let backend = if msg.contains("codex") {
804 "codex"
805 } else if msg.contains("claude") || msg.contains("anthropic-ratelimit") {
806 "claude"
807 } else if msg.contains("opencode") {
808 "opencode"
809 } else {
810 "unknown"
811 };
812 FallbackReason::OAuthQuota { backend }
813 }
814 EmbeddingErrorKind::BackendMismatch => {
815 let (requested, resolved) =
820 if msg.contains("requested claude") && msg.contains("but codex") {
821 ("claude", "codex")
822 } else if msg.contains("requested codex") && msg.contains("but claude") {
823 ("codex", "claude")
824 } else if msg.contains("requested claude") {
825 ("claude", "unknown")
826 } else if msg.contains("requested codex") {
827 ("codex", "unknown")
828 } else {
829 ("unknown", "unknown")
830 };
831 FallbackReason::BackendMismatch {
832 requested,
833 resolved,
834 }
835 }
836 EmbeddingErrorKind::ZeroDimension => FallbackReason::DimZero,
837 EmbeddingErrorKind::Unknown => {
838 if msg.contains("cancelled") {
839 FallbackReason::Cancelled
840 } else {
841 FallbackReason::EmbeddingFailed(msg)
842 }
843 }
844 },
845 e => FallbackReason::EmbeddingFailed(e.to_string()),
846 }
847}
848pub fn embed_with_fallback(
867 models_dir: &Path,
868 text: &str,
869 chain: &[LlmBackendKind],
870 skip_on_failure: bool,
871) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
872 use crate::llm::exit_code_hints::LlmBackendError;
873 let effective: Vec<LlmBackendKind> = if chain.is_empty() {
874 vec![
875 LlmBackendKind::Codex,
876 LlmBackendKind::Claude,
877 LlmBackendKind::Opencode,
878 LlmBackendKind::None,
879 ]
880 } else {
881 chain.to_vec()
882 };
883
884 let mut last_err: Option<AppError> = None;
885 for backend in &effective {
886 match embed_via_backend_strict(
897 models_dir,
898 text,
899 backend,
900 last_err.as_ref(),
901 skip_on_failure,
902 ) {
903 Ok((v, resolved_kind)) => return Ok((v, resolved_kind)),
904 Err(e) => {
905 if matches!(e, AppError::Validation(_)) {
910 return Err(e);
911 }
912 tracing::warn!(
913 target: "embedding",
914 backend = ?backend,
915 error = %e,
916 "embed_with_fallback: backend failed, trying next"
917 );
918 last_err = Some(e);
919 }
920 }
921 }
922 if skip_on_failure {
923 return Ok((Vec::new(), LlmBackendKind::None));
928 }
929 Err(last_err
930 .unwrap_or_else(|| AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string())))
931}
932
933#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
937pub enum LlmBackendKind {
938 Codex,
940 Claude,
942 Opencode,
944 OpenRouter,
946 None,
948}
949
950impl LlmBackendKind {
951 pub fn as_str(self) -> &'static str {
954 match self {
955 Self::Codex => "codex",
956 Self::Claude => "claude",
957 Self::Opencode => "opencode",
958 Self::OpenRouter => "openrouter",
959 Self::None => "none",
960 }
961 }
962}
963
964pub fn embed_via_backend(
979 models_dir: &Path,
980 text: &str,
981 backend: &LlmBackendKind,
982) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
983 match backend {
984 LlmBackendKind::None => Ok((Vec::new(), LlmBackendKind::None)),
985 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
986 LlmBackendKind::Claude => {
987 tracing::debug!(
991 target: "embedder",
992 backend = "claude",
993 "embed_via_backend: forcing claude (ADR-0042 / GAP-002 fix)"
994 );
995 embed_via_claude_local_resolved(models_dir, text, None, None)
996 }
997 LlmBackendKind::Opencode => {
998 tracing::debug!(
999 target: "embedder",
1000 backend = "opencode",
1001 "embed_via_backend: forcing opencode (GAP-OPENCODE-001)"
1002 );
1003 embed_via_opencode_local_resolved(models_dir, text, None, None)
1004 }
1005 LlmBackendKind::OpenRouter => {
1006 tracing::debug!(
1007 target: "embedder",
1008 backend = "openrouter",
1009 "embed_via_backend: using OpenRouter API (v1.0.93)"
1010 );
1011 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1012 AppError::Embedding(
1013 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1014 )
1015 })?;
1016 let rt = shared_runtime()?;
1017 let vec = rt.block_on(client.embed_single(text, client.default_input_type()))?;
1018 Ok((vec, LlmBackendKind::OpenRouter))
1019 }
1020 }
1021}
1022
1023pub fn embed_via_backend_strict(
1036 models_dir: &Path,
1037 text: &str,
1038 backend: &LlmBackendKind,
1039 last_err: Option<&AppError>,
1040 skip_on_failure: bool,
1041) -> Result<(Vec<f32>, LlmBackendKind), AppError> {
1042 use crate::llm::exit_code_hints::LlmBackendError;
1043 match backend {
1044 LlmBackendKind::None => {
1045 if skip_on_failure && last_err.is_none() {
1049 Ok((Vec::new(), LlmBackendKind::None))
1050 } else if last_err.is_some() {
1051 Err(match last_err {
1055 Some(e) => AppError::Embedding(format!("{e}")),
1056 None => AppError::Embedding(LlmBackendError::NoBackendsAvailable.to_string()),
1057 })
1058 } else {
1059 Err(AppError::Embedding(
1062 LlmBackendError::NoBackendsAvailable.to_string(),
1063 ))
1064 }
1065 }
1066 LlmBackendKind::Codex => embed_passage_local_resolved(models_dir, text),
1067 LlmBackendKind::Claude => {
1068 tracing::debug!(
1069 target: "embedder",
1070 backend = "claude",
1071 "embed_via_backend_strict: forcing claude (ADR-0042 / GAP-002 fix)"
1072 );
1073 embed_via_claude_local_resolved(models_dir, text, None, None)
1074 }
1075 LlmBackendKind::Opencode => {
1076 tracing::debug!(
1077 target: "embedder",
1078 backend = "opencode",
1079 "embed_via_backend_strict: forcing opencode (GAP-OPENCODE-001)"
1080 );
1081 embed_via_opencode_local_resolved(models_dir, text, None, None)
1082 }
1083 LlmBackendKind::OpenRouter => embed_via_backend(models_dir, text, backend),
1084 }
1085}
1086
1087pub fn embed_via_backend_legacy(
1092 models_dir: &Path,
1093 text: &str,
1094 backend: &LlmBackendKind,
1095) -> Result<Vec<f32>, AppError> {
1096 embed_via_backend(models_dir, text, backend).map(|(v, _)| v)
1097}
1098
1099pub fn embed_passages_controlled_local(
1100 models_dir: &Path,
1101 texts: &[&str],
1102 token_counts: &[usize],
1103) -> Result<Vec<Vec<f32>>, AppError> {
1104 let embedder = get_embedder(models_dir)?;
1105 embed_passages_controlled(embedder, texts, token_counts)
1106}
1107
1108pub fn embed_passages_parallel_local(
1111 models_dir: &Path,
1112 texts: &[String],
1113 parallelism: usize,
1114 batch_size: usize,
1115) -> Result<Vec<Vec<f32>>, AppError> {
1116 let embedder = get_embedder(models_dir)?;
1117 embed_texts_parallel(embedder, texts, parallelism, batch_size)
1118}
1119
1120type EmbedChunkResult = (usize, Result<Vec<Vec<f32>>, AppError>);
1124
1125fn reassemble_ordered(mut parts: Vec<(usize, Vec<Vec<f32>>)>) -> Vec<Vec<f32>> {
1130 parts.sort_by_key(|(idx, _)| *idx);
1131 parts.into_iter().flat_map(|(_, v)| v).collect()
1132}
1133
1134pub fn embed_passages_parallel_with_embedding_choice(
1141 models_dir: &Path,
1142 texts: &[String],
1143 parallelism: usize,
1144 batch_size: usize,
1145 embedding_backend: crate::cli::EmbeddingBackendChoice,
1146 llm_backend: crate::cli::LlmBackendChoice,
1147) -> Result<Vec<Vec<f32>>, AppError> {
1148 let chain = embedding_backend.to_chain(llm_backend);
1149 if chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized() {
1150 let client = OPENROUTER_CLIENT.get().ok_or_else(|| {
1151 AppError::Embedding(
1152 "OpenRouter client not initialised; call get_openrouter_embedder first".into(),
1153 )
1154 })?;
1155 let rt = shared_runtime()?;
1156
1157 let k = parallelism.clamp(1, 16);
1162 if texts.len() <= 32 || k == 1 {
1163 let refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
1164 let vecs = rt.block_on(client.embed_batch(&refs, client.default_input_type()))?;
1165 return Ok(vecs);
1166 }
1167
1168 let vecs = rt.block_on(async move {
1173 let mut set: JoinSet<EmbedChunkResult> = JoinSet::new();
1174 let mut parts: Vec<(usize, Vec<Vec<f32>>)> = Vec::new();
1175
1176 for (idx, chunk) in texts.chunks(32).enumerate() {
1177 if set.len() >= k {
1178 if let Some(joined) = set.join_next().await {
1179 let (cidx, res) = joined.map_err(|e| {
1180 AppError::Embedding(format!("embedding task join error: {e}"))
1181 })?;
1182 parts.push((cidx, res?));
1183 }
1184 }
1185 let owned: Vec<String> = chunk.to_vec();
1186 set.spawn(async move {
1187 let refs: Vec<&str> = owned.iter().map(|s| s.as_str()).collect();
1188 let r = client.embed_batch(&refs, client.default_input_type()).await;
1189 (idx, r)
1190 });
1191 }
1192
1193 while let Some(joined) = set.join_next().await {
1194 let (cidx, res) = joined
1195 .map_err(|e| AppError::Embedding(format!("embedding task join error: {e}")))?;
1196 parts.push((cidx, res?));
1197 }
1198
1199 Ok::<Vec<Vec<f32>>, AppError>(reassemble_ordered(parts))
1200 })?;
1201 Ok(vecs)
1202 } else {
1203 embed_passages_parallel_local(models_dir, texts, parallelism, batch_size)
1204 }
1205}
1206
1207type EntityEmbedCacheMap = std::collections::HashMap<u64, Arc<Vec<f32>>>;
1219
1220static ENTITY_EMBED_CACHE: OnceLock<parking_lot::Mutex<EntityEmbedCacheMap>> = OnceLock::new();
1221
1222fn entity_embed_cache() -> &'static parking_lot::Mutex<EntityEmbedCacheMap> {
1223 ENTITY_EMBED_CACHE.get_or_init(|| parking_lot::Mutex::new(std::collections::HashMap::new()))
1224}
1225
1226fn entity_cache_key(model: &str, text: &str) -> u64 {
1227 let mut hasher = blake3::Hasher::new();
1228 hasher.update(model.as_bytes());
1229 hasher.update(b"\0");
1230 hasher.update(text.as_bytes());
1231 let h = hasher.finalize();
1232 let bytes = h.as_bytes();
1233 u64::from_le_bytes([
1234 bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7],
1235 ])
1236}
1237
1238pub fn embed_entity_texts_cached(
1248 models_dir: &Path,
1249 texts: &[String],
1250 parallelism: usize,
1251 embedding_backend: crate::cli::EmbeddingBackendChoice,
1252 llm_backend: crate::cli::LlmBackendChoice,
1253) -> Result<(Vec<Vec<f32>>, EmbedCacheStats), AppError> {
1254 if texts.is_empty() {
1255 return Ok((Vec::new(), EmbedCacheStats::default()));
1256 }
1257 let chain = embedding_backend.to_chain(llm_backend);
1261
1262 if chain.as_slice() == [LlmBackendKind::None] {
1268 let out: Vec<Vec<f32>> = texts.iter().map(|_| Vec::new()).collect();
1269 return Ok((
1270 out,
1271 EmbedCacheStats {
1272 requested: texts.len(),
1273 hits: 0,
1274 misses: texts.len(),
1275 },
1276 ));
1277 }
1278
1279 let routed_openrouter =
1285 chain.first() == Some(&LlmBackendKind::OpenRouter) && is_openrouter_initialized();
1286 let model = if routed_openrouter {
1287 format!("openrouter:{}", crate::constants::embedding_dim())
1288 } else {
1289 get_embedder(models_dir)?.lock().model_label()
1290 };
1291 let cache = entity_embed_cache();
1292 let mut hits: Vec<Option<Arc<Vec<f32>>>> = vec![None; texts.len()];
1293 let mut miss_indices: Vec<usize> = Vec::with_capacity(texts.len());
1294 {
1295 let guard = cache.lock();
1296 for (i, text) in texts.iter().enumerate() {
1297 let key = entity_cache_key(&model, text);
1298 if let Some(v) = guard.get(&key) {
1299 hits[i] = Some(Arc::clone(v));
1300 } else {
1301 miss_indices.push(i);
1302 }
1303 }
1304 }
1305 let miss_count = miss_indices.len();
1306 if miss_count > 0 {
1307 let miss_texts: Vec<String> = miss_indices.iter().map(|&i| texts[i].clone()).collect();
1308 let miss_vecs = embed_passages_parallel_with_embedding_choice(
1312 models_dir,
1313 &miss_texts,
1314 parallelism,
1315 entity_embed_batch_size(),
1316 embedding_backend,
1317 llm_backend,
1318 )?;
1319 let mut guard = cache.lock();
1320 for (slot, &orig_idx) in miss_indices.iter().enumerate() {
1321 let vec = Arc::new(miss_vecs[slot].clone());
1322 let key = entity_cache_key(&model, &texts[orig_idx]);
1323 guard.insert(key, Arc::clone(&vec));
1324 hits[orig_idx] = Some(vec);
1325 }
1326 }
1327 let mut out = Vec::with_capacity(texts.len());
1328 for hit in hits.into_iter() {
1329 let v = hit.ok_or_else(|| {
1330 AppError::Embedding("entity embed cache produced null result".to_string())
1331 })?;
1332 out.push((*v).clone());
1333 }
1334 Ok((
1335 out,
1336 EmbedCacheStats {
1337 requested: texts.len(),
1338 hits: texts.len() - miss_count,
1339 misses: miss_count,
1340 },
1341 ))
1342}
1343
1344#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, serde::Serialize)]
1346pub struct EmbedCacheStats {
1347 pub requested: usize,
1348 pub hits: usize,
1349 pub misses: usize,
1350}
1351
1352impl EmbedCacheStats {
1353 pub fn hit_rate(&self) -> f64 {
1355 if self.requested == 0 {
1356 0.0
1357 } else {
1358 self.hits as f64 / self.requested as f64
1359 }
1360 }
1361}
1362
1363pub fn embed_texts_parallel(
1376 embedder: &Mutex<LlmEmbedding>,
1377 texts: &[String],
1378 parallelism: usize,
1379 batch_size: usize,
1380) -> Result<Vec<Vec<f32>>, AppError> {
1381 let mut slots: Vec<Option<Vec<f32>>> = vec![None; texts.len()];
1382 embed_texts_parallel_with(embedder, texts, parallelism, batch_size, |idx, v| {
1383 slots[idx] = Some(v.to_vec());
1384 Ok(())
1385 })?;
1386 let mut out = Vec::with_capacity(slots.len());
1387 for (idx, slot) in slots.into_iter().enumerate() {
1388 out.push(slot.ok_or_else(|| {
1389 AppError::Embedding(format!("embedding fan-out lost item index {idx}"))
1390 })?);
1391 }
1392 Ok(out)
1393}
1394
1395pub fn embed_texts_parallel_with(
1399 embedder: &Mutex<LlmEmbedding>,
1400 texts: &[String],
1401 parallelism: usize,
1402 batch_size: usize,
1403 mut on_result: impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1404) -> Result<(), AppError> {
1405 if texts.is_empty() {
1406 return Ok(());
1407 }
1408 let dim = crate::constants::embedding_dim();
1409 if texts.len() == 1 {
1410 let v = embed_passage(embedder, &texts[0])?;
1411 return on_result(0, &v);
1412 }
1413
1414 let client = clone_client(embedder);
1415 let permits = effective_permits(parallelism);
1416 let batches = build_batches(texts, batch_size.max(1));
1417 let token = crate::cancel_token().clone();
1418
1419 let work = move |batch: Vec<(usize, String)>| {
1420 let client = client.clone();
1421 async move {
1422 client
1423 .embed_batch_async(crate::constants::PASSAGE_PREFIX, &batch)
1424 .await
1425 }
1426 };
1427
1428 let fan_out = run_bounded(batches, permits, dim, token, work, &mut on_result);
1429 match tokio::runtime::Handle::try_current() {
1430 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(fan_out)),
1431 Err(_) => shared_runtime()?.block_on(fan_out),
1432 }
1433}
1434
1435fn build_batches(texts: &[String], batch_size: usize) -> Vec<Vec<(usize, String)>> {
1437 texts
1438 .iter()
1439 .cloned()
1440 .enumerate()
1441 .collect::<Vec<_>>()
1442 .chunks(batch_size)
1443 .map(|c| c.to_vec())
1444 .collect()
1445}
1446
1447pub fn effective_permits(requested: usize) -> usize {
1452 let cpus = std::thread::available_parallelism()
1453 .map(|n| n.get())
1454 .unwrap_or(4);
1455 let by_ram = ((crate::memory_guard::available_memory_mb() / 2)
1456 / crate::constants::LLM_WORKER_RSS_MB)
1457 .max(1) as usize;
1458 requested.clamp(1, 32).min(cpus).min(by_ram).max(1)
1459}
1460
1461async fn run_bounded<F, Fut>(
1471 batches: Vec<Vec<(usize, String)>>,
1472 permits: usize,
1473 dim: usize,
1474 token: CancellationToken,
1475 work: F,
1476 on_result: &mut impl FnMut(usize, &[f32]) -> Result<(), AppError>,
1477) -> Result<(), AppError>
1478where
1479 F: Fn(Vec<(usize, String)>) -> Fut + Clone + Send + 'static,
1480 Fut: std::future::Future<Output = Result<Vec<(usize, Vec<f32>)>, AppError>> + Send,
1481{
1482 let total_batches = batches.len();
1483 let semaphore = Arc::new(Semaphore::new(permits));
1484 let (tx, mut rx) = mpsc::channel::<Result<Vec<(usize, Vec<f32>)>, AppError>>(permits * 2);
1487 let mut set: JoinSet<()> = JoinSet::new();
1488
1489 for (batch_idx, batch) in batches.into_iter().enumerate() {
1490 let sem = Arc::clone(&semaphore);
1491 let token = token.clone();
1492 let tx = tx.clone();
1493 let work = work.clone();
1494 set.spawn(async move {
1495 let wait_start = std::time::Instant::now();
1496 let Ok(_permit) = sem.acquire_owned().await else {
1499 let _ = tx
1500 .send(Err(AppError::Embedding("semaphore closed".to_string())))
1501 .await;
1502 return;
1503 };
1504 let permit_wait_ms = wait_start.elapsed().as_millis() as u64;
1505 let work_start = std::time::Instant::now();
1506 let outcome = if crate::should_obey_shutdown() {
1512 tokio::select! {
1513 res = work(batch) => res,
1514 _ = token.cancelled() => Err(AppError::Embedding(
1515 "embedding cancelled by shutdown signal".to_string(),
1516 )),
1517 }
1518 } else {
1519 work(batch).await
1520 };
1521 tracing::debug!(
1523 target: "embedding",
1524 batch_idx,
1525 permit_wait_ms,
1526 work_ms = work_start.elapsed().as_millis() as u64,
1527 ok = outcome.is_ok(),
1528 "embedding batch finished"
1529 );
1530 let _ = tx.send(outcome).await;
1531 });
1532 }
1533 drop(tx);
1534
1535 let mut completed = 0usize;
1536 let mut failed = 0usize;
1537 let mut cancelled = 0usize;
1538 let mut first_error: Option<AppError> = None;
1539
1540 while let Some(message) = rx.recv().await {
1541 match message {
1542 Ok(items) => {
1543 completed += 1;
1544 if first_error.is_none() {
1545 for (idx, v) in items {
1546 if v.len() != dim {
1547 first_error = Some(AppError::Embedding(format!(
1548 "LLM returned {} dims for item {idx}, expected {dim}; \
1549 refusing to truncate or pad silently (G42/C5)",
1550 v.len()
1551 )));
1552 break;
1553 }
1554 if let Err(e) = on_result(idx, &v) {
1555 first_error = Some(e);
1556 break;
1557 }
1558 }
1559 if first_error.is_some() {
1560 set.shutdown().await;
1563 }
1564 }
1565 }
1566 Err(e) => {
1567 if matches!(&e, AppError::Embedding(msg) if msg.contains("cancelled")) {
1568 cancelled += 1;
1569 } else {
1570 failed += 1;
1571 }
1572 if first_error.is_none() {
1573 first_error = Some(e);
1574 set.shutdown().await;
1575 }
1576 }
1577 }
1578 }
1579
1580 while let Some(join_result) = set.join_next().await {
1583 if let Err(join_err) = join_result {
1584 if join_err.is_panic() {
1585 failed += 1;
1586 if first_error.is_none() {
1587 first_error = Some(AppError::Embedding(format!(
1588 "embedding task panicked: {join_err}"
1589 )));
1590 }
1591 } else {
1592 cancelled += 1;
1593 }
1594 }
1595 }
1596
1597 tracing::debug!(
1607 target: "embedding",
1608 total_batches,
1609 completed,
1610 failed,
1611 cancelled,
1612 "embedding fan-out finished"
1613 );
1614
1615 match first_error {
1616 Some(e) => Err(e),
1617 None => Ok(()),
1618 }
1619}
1620
1621pub fn f32_to_bytes(v: &[f32]) -> Vec<u8> {
1622 let mut out = Vec::with_capacity(v.len() * 4);
1623 for f in v {
1624 out.extend_from_slice(&f.to_le_bytes());
1625 }
1626 out
1627}
1628
1629pub fn bytes_to_f32(bytes: &[u8]) -> Vec<f32> {
1630 let mut out = Vec::with_capacity(bytes.len() / 4);
1631 for chunk in bytes.chunks_exact(4) {
1632 out.push(f32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]));
1633 }
1634 out
1635}
1636
1637pub fn embedding_dim() -> usize {
1640 crate::constants::embedding_dim()
1641}
1642
1643fn validate_dim(v: Vec<f32>) -> Result<Vec<f32>, AppError> {
1647 let dim = crate::constants::embedding_dim();
1648 if v.len() != dim {
1649 return Err(AppError::Embedding(format!(
1650 "embedding has {} dims, expected {dim}; \
1651 refusing to truncate or pad silently (G42/C5)",
1652 v.len()
1653 )));
1654 }
1655 Ok(v)
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660 use super::*;
1661 use std::sync::atomic::{AtomicUsize, Ordering};
1662
1663 #[test]
1664 fn reassemble_ordered_restores_input_order() {
1665 let parts = vec![
1669 (2, vec![vec![2.0_f32], vec![2.1]]),
1670 (0, vec![vec![0.0], vec![0.1]]),
1671 (1, vec![vec![1.0], vec![1.1]]),
1672 ];
1673 let out = reassemble_ordered(parts);
1674 assert_eq!(
1675 out,
1676 vec![
1677 vec![0.0_f32],
1678 vec![0.1],
1679 vec![1.0],
1680 vec![1.1],
1681 vec![2.0],
1682 vec![2.1],
1683 ]
1684 );
1685 }
1686
1687 #[test]
1688 fn f32_to_bytes_roundtrip() {
1689 let input = vec![0.0_f32, 1.5, -2.25, f32::MIN, f32::MAX];
1690 let bytes = f32_to_bytes(&input);
1691 assert_eq!(bytes.len(), input.len() * 4);
1692 let out = bytes_to_f32(&bytes);
1693 assert_eq!(out, input);
1694 }
1695
1696 #[test]
1697 fn validate_dim_rejects_divergent_vectors() {
1698 let dim = crate::constants::embedding_dim();
1701 let long = vec![0.0; dim + 10];
1702 assert!(validate_dim(long).is_err(), "longer vector must error");
1703 let short = vec![0.0; dim.saturating_sub(1).max(1)];
1704 assert!(validate_dim(short).is_err(), "shorter vector must error");
1705 let exact = vec![0.0; dim];
1706 assert_eq!(validate_dim(exact).expect("exact dim must pass").len(), dim);
1707 }
1708
1709 #[test]
1710 fn embedding_dim_matches_constants_source() {
1711 assert_eq!(embedding_dim(), crate::constants::embedding_dim());
1712 }
1713
1714 #[test]
1715 fn build_batches_preserves_global_indices() {
1716 let texts: Vec<String> = (0..10).map(|i| format!("t{i}")).collect();
1717 let batches = build_batches(&texts, 4);
1718 assert_eq!(batches.len(), 3);
1719 assert_eq!(batches[0].len(), 4);
1720 assert_eq!(batches[2].len(), 2);
1721 assert_eq!(batches[2][1].0, 9);
1722 assert_eq!(batches[2][1].1, "t9");
1723 }
1724
1725 #[test]
1726 fn effective_permits_clamps_to_bounds() {
1727 assert!(effective_permits(0) >= 1);
1728 assert!(effective_permits(1000) <= 32);
1729 }
1730
1731 fn test_batches(n: usize) -> Vec<Vec<(usize, String)>> {
1732 (0..n).map(|i| vec![(i, format!("t{i}"))]).collect()
1733 }
1734
1735 fn dummy_vec(dim: usize) -> Vec<f32> {
1736 vec![0.0; dim]
1737 }
1738
1739 #[test]
1742 fn concurrency_peak_never_exceeds_permits() {
1743 let permits = 4usize;
1744 let batches = test_batches(permits * 10);
1745 let dim = crate::constants::embedding_dim();
1746 let current = Arc::new(AtomicUsize::new(0));
1747 let peak = Arc::new(AtomicUsize::new(0));
1748
1749 let current_c = Arc::clone(¤t);
1750 let peak_c = Arc::clone(&peak);
1751 let work = move |batch: Vec<(usize, String)>| {
1752 let current = Arc::clone(¤t_c);
1753 let peak = Arc::clone(&peak_c);
1754 async move {
1755 let now = current.fetch_add(1, Ordering::SeqCst) + 1;
1756 peak.fetch_max(now, Ordering::SeqCst);
1757 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1758 current.fetch_sub(1, Ordering::SeqCst);
1759 Ok(batch
1760 .into_iter()
1761 .map(|(i, _)| (i, dummy_vec(dim)))
1762 .collect())
1763 }
1764 };
1765
1766 let mut delivered = 0usize;
1767 let rt = tokio::runtime::Builder::new_multi_thread()
1768 .worker_threads(4)
1769 .enable_all()
1770 .build()
1771 .expect("test runtime");
1772 rt.block_on(run_bounded(
1773 batches,
1774 permits,
1775 dim,
1776 CancellationToken::new(),
1777 work,
1778 &mut |_idx, _v| {
1779 delivered += 1;
1780 Ok(())
1781 },
1782 ))
1783 .expect("fan-out must succeed");
1784
1785 assert_eq!(delivered, permits * 10, "every item must be delivered");
1786 assert!(
1787 peak.load(Ordering::SeqCst) <= permits,
1788 "peak concurrency {} exceeded permits {permits}",
1789 peak.load(Ordering::SeqCst)
1790 );
1791 }
1792
1793 #[test]
1796 fn panicking_task_returns_permit_and_surfaces_error() {
1797 let permits = 2usize;
1798 let batches = test_batches(4);
1799 let dim = crate::constants::embedding_dim();
1800
1801 let work = move |batch: Vec<(usize, String)>| async move {
1802 if batch[0].0 == 1 {
1803 panic!("intentional test panic");
1804 }
1805 Ok(batch
1806 .into_iter()
1807 .map(|(i, _)| (i, dummy_vec(dim)))
1808 .collect())
1809 };
1810
1811 let rt = tokio::runtime::Builder::new_multi_thread()
1812 .worker_threads(2)
1813 .enable_all()
1814 .build()
1815 .expect("test runtime");
1816 let result = rt.block_on(run_bounded(
1817 batches,
1818 permits,
1819 dim,
1820 CancellationToken::new(),
1821 work,
1822 &mut |_idx, _v| Ok(()),
1823 ));
1824
1825 let err = result.expect_err("panic must surface as an error");
1826 assert!(
1827 err.to_string().contains("panicked"),
1828 "error must mention the panic: {err}"
1829 );
1830 }
1831
1832 #[test]
1835 fn cancellation_terminates_fan_out_quickly() {
1836 let permits = 2usize;
1837 let batches = test_batches(8);
1838 let dim = crate::constants::embedding_dim();
1839 let token = CancellationToken::new();
1840
1841 let work = move |batch: Vec<(usize, String)>| async move {
1842 tokio::time::sleep(std::time::Duration::from_secs(30)).await;
1844 Ok(batch
1845 .into_iter()
1846 .map(|(i, _)| (i, dummy_vec(dim)))
1847 .collect())
1848 };
1849
1850 let rt = tokio::runtime::Builder::new_multi_thread()
1851 .worker_threads(2)
1852 .enable_all()
1853 .build()
1854 .expect("test runtime");
1855 let cancel = token.clone();
1856 let start = std::time::Instant::now();
1857 let result = rt.block_on(async move {
1858 tokio::spawn(async move {
1859 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1860 cancel.cancel();
1861 });
1862 run_bounded(batches, permits, dim, token, work, &mut |_idx, _v| Ok(())).await
1863 });
1864
1865 assert!(result.is_err(), "cancelled fan-out must report an error");
1866 assert!(
1867 start.elapsed() < std::time::Duration::from_secs(10),
1868 "graceful shutdown must finish well under the work duration"
1869 );
1870 }
1871
1872 #[test]
1875 fn fan_out_rejects_divergent_dim() {
1876 let permits = 2usize;
1877 let batches = test_batches(2);
1878 let dim = crate::constants::embedding_dim();
1879
1880 let work = move |batch: Vec<(usize, String)>| async move {
1881 Ok(batch
1882 .into_iter()
1883 .map(|(i, _)| (i, vec![0.0f32; 3]))
1884 .collect::<Vec<(usize, Vec<f32>)>>())
1885 };
1886
1887 let rt = tokio::runtime::Builder::new_multi_thread()
1888 .worker_threads(2)
1889 .enable_all()
1890 .build()
1891 .expect("test runtime");
1892 let result = rt.block_on(run_bounded(
1893 batches,
1894 permits,
1895 dim,
1896 CancellationToken::new(),
1897 work,
1898 &mut |_idx, _v| Ok(()),
1899 ));
1900
1901 let err = result.expect_err("divergent dim must fail the fan-out");
1902 assert!(err.to_string().contains("G42/C5"), "error cites C5: {err}");
1903 }
1904
1905 #[test]
1907 fn adaptive_batch_dim64_keeps_calibrated_sizes() {
1908 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 64), 8);
1909 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 64), 25);
1910 }
1911
1912 #[test]
1914 fn adaptive_batch_dim384_shrinks() {
1915 assert_eq!(adaptive_batch_for_dim(CHUNK_EMBED_BATCH_SIZE, 384), 1);
1916 assert_eq!(adaptive_batch_for_dim(ENTITY_EMBED_BATCH_SIZE, 384), 4);
1917 }
1918
1919 #[test]
1921 fn adaptive_batch_intermediate_dims() {
1922 assert_eq!(adaptive_batch_for_dim(8, 128), 4);
1923 assert_eq!(adaptive_batch_for_dim(8, 256), 2);
1924 }
1925
1926 #[test]
1928 fn adaptive_batch_small_dim_clamps_to_base() {
1929 assert_eq!(adaptive_batch_for_dim(8, 8), 8);
1930 }
1931
1932 #[test]
1934 fn adaptive_batch_total_function() {
1935 assert_eq!(adaptive_batch_for_dim(8, 4096), 1);
1936 assert_eq!(adaptive_batch_for_dim(8, 0), 8);
1937 assert_eq!(adaptive_batch_for_dim(0, 64), 1);
1938 }
1939
1940 #[test]
1942 #[serial_test::serial(env)]
1943 fn adaptive_wrappers_follow_env_dim() {
1944 std::env::set_var("SQLITE_GRAPHRAG_EMBEDDING_DIM", "384");
1945 let chunk = chunk_embed_batch_size();
1946 let entity = entity_embed_batch_size();
1947 std::env::remove_var("SQLITE_GRAPHRAG_EMBEDDING_DIM");
1948 crate::constants::set_active_embedding_dim(crate::constants::DEFAULT_EMBEDDING_DIM);
1949 assert_eq!(chunk, 1, "384-dim chunk batch must shrink to 1 (G44)");
1950 assert_eq!(entity, 4, "384-dim entity batch must shrink to 4 (G44)");
1951 }
1952
1953 #[test]
1961 fn embedding_error_kind_classify_oauth_message() {
1962 assert_eq!(
1963 EmbeddingErrorKind::classify("OAuth token expired for claude"),
1964 EmbeddingErrorKind::OAuth,
1965 );
1966 assert_eq!(
1967 EmbeddingErrorKind::classify("oauth authentication failed"),
1968 EmbeddingErrorKind::OAuth,
1969 );
1970 }
1971
1972 #[test]
1975 fn embedding_error_kind_classify_quota_message() {
1976 assert_eq!(
1977 EmbeddingErrorKind::classify("quota exhausted on backend"),
1978 EmbeddingErrorKind::Quota,
1979 );
1980 assert_eq!(
1981 EmbeddingErrorKind::classify("Usage quota limit reached"),
1982 EmbeddingErrorKind::Quota,
1983 );
1984 }
1985
1986 #[test]
1990 fn embedding_error_kind_classify_slot_exhausted_message() {
1991 assert_eq!(
1992 EmbeddingErrorKind::classify(
1993 "slot exhausted: failed to acquire LLM slot after backoff"
1994 ),
1995 EmbeddingErrorKind::SlotExhausted,
1996 );
1997 }
1998
1999 #[test]
2002 fn embedding_error_kind_classify_zero_dimension_message() {
2003 assert_eq!(
2004 EmbeddingErrorKind::classify("embedding returned dim=zero"),
2005 EmbeddingErrorKind::ZeroDimension,
2006 );
2007 assert_eq!(
2008 EmbeddingErrorKind::classify("got zero-dim vector from LLM"),
2009 EmbeddingErrorKind::ZeroDimension,
2010 );
2011 }
2012
2013 #[test]
2017 fn embedding_error_kind_classify_unknown_fallback() {
2018 assert_eq!(
2019 EmbeddingErrorKind::classify("unrelated subprocess error"),
2020 EmbeddingErrorKind::Unknown,
2021 );
2022 assert_eq!(
2023 EmbeddingErrorKind::classify("rate limit hit"),
2024 EmbeddingErrorKind::Unknown,
2025 );
2026 assert_eq!(EmbeddingErrorKind::OAuth.code(), "oauth");
2028 assert_eq!(EmbeddingErrorKind::Quota.code(), "quota");
2029 assert_eq!(EmbeddingErrorKind::SlotExhausted.code(), "slot-exhausted");
2030 assert_eq!(
2031 EmbeddingErrorKind::BackendMismatch.code(),
2032 "backend-mismatch"
2033 );
2034 assert_eq!(EmbeddingErrorKind::ZeroDimension.code(), "zero-dimension");
2035 assert_eq!(EmbeddingErrorKind::Unknown.code(), "unknown");
2036 }
2037
2038 #[test]
2040 fn fallback_reason_display_does_not_panic() {
2041 let _ = FallbackReason::EmbeddingFailed("rate limit".into()).to_string();
2042 let _ = FallbackReason::Cancelled.to_string();
2043 let _ = FallbackReason::Timeout {
2044 operation: "embed_query".into(),
2045 duration_secs: 30,
2046 }
2047 .to_string();
2048 }
2049
2050 #[test]
2053 fn fallback_reason_is_partial_eq() {
2054 assert_eq!(
2055 FallbackReason::EmbeddingFailed("a".into()),
2056 FallbackReason::EmbeddingFailed("a".into())
2057 );
2058 assert_eq!(FallbackReason::Cancelled, FallbackReason::Cancelled);
2059 assert_ne!(
2060 FallbackReason::EmbeddingFailed("a".into()),
2061 FallbackReason::EmbeddingFailed("b".into())
2062 );
2063 assert_ne!(
2064 FallbackReason::Cancelled,
2065 FallbackReason::Timeout {
2066 operation: "x".into(),
2067 duration_secs: 1
2068 }
2069 );
2070 }
2071
2072 #[test]
2075 fn fallback_reason_timeout_preserves_fields() {
2076 let r = FallbackReason::Timeout {
2077 operation: "embed_query_local".into(),
2078 duration_secs: 300,
2079 };
2080 match r {
2081 FallbackReason::Timeout {
2082 operation,
2083 duration_secs,
2084 } => {
2085 assert_eq!(operation, "embed_query_local");
2086 assert_eq!(duration_secs, 300);
2087 }
2088 other => panic!("expected Timeout, got {other:?}"),
2089 }
2090 }
2091
2092 #[test]
2098 #[ignore = "G58 S1 stub: requires env without codex/claude on PATH; tracked as T5 of Fase 2"]
2099 fn try_embed_query_with_fallback_surfaces_embedding_failed_for_missing_binary() {
2100 let bogus = std::path::Path::new("/nonexistent-models-dir-for-g58-fallback-test");
2103 let result = try_embed_query_with_fallback(bogus, "hello world");
2104 match result {
2105 Err(FallbackReason::EmbeddingFailed(msg)) => {
2106 assert!(!msg.is_empty(), "fallback message must not be empty");
2108 }
2109 Err(FallbackReason::Cancelled) => {
2110 panic!("expected EmbeddingFailed, got Cancelled");
2111 }
2112 Err(FallbackReason::Timeout { .. }) => {
2113 panic!("expected EmbeddingFailed, got Timeout");
2114 }
2115 Err(FallbackReason::SlotExhausted) => {
2116 panic!("expected EmbeddingFailed, got SlotExhausted");
2117 }
2118 Err(FallbackReason::OAuthQuota { .. }) => {
2119 panic!("expected EmbeddingFailed, got OAuthQuota");
2120 }
2121 Err(FallbackReason::BackendMismatch { .. }) => {
2122 panic!("expected EmbeddingFailed, got BackendMismatch");
2123 }
2124 Err(FallbackReason::DimZero) => {
2125 panic!("expected EmbeddingFailed, got DimZero");
2126 }
2127 Ok(_) => {
2128 panic!("expected an error, got Ok — embedder must fail for bogus path");
2129 }
2130 }
2131 }
2132
2133 #[test]
2135 fn g56_entity_cache_key_is_stable_and_distinct() {
2136 let k1 = entity_cache_key("codex:default", "sqlite-graphrag");
2137 let k2 = entity_cache_key("codex:default", "sqlite-graphrag");
2138 let k3 = entity_cache_key("codex:default", "claude-code");
2139 let k4 = entity_cache_key("claude:default", "sqlite-graphrag");
2140 assert_eq!(k1, k2, "same model+text must hash identically");
2141 assert_ne!(k1, k3, "different text must hash differently");
2142 assert_ne!(k1, k4, "different model must hash differently");
2143 }
2144
2145 #[test]
2146 fn g56_entity_embed_cache_stats_hit_rate() {
2147 let zero = EmbedCacheStats::default();
2148 assert_eq!(zero.hit_rate(), 0.0);
2149 let half = EmbedCacheStats {
2150 requested: 4,
2151 hits: 2,
2152 misses: 2,
2153 };
2154 assert!((half.hit_rate() - 0.5).abs() < 1e-9);
2155 let all = EmbedCacheStats {
2156 requested: 7,
2157 hits: 7,
2158 misses: 0,
2159 };
2160 assert!((all.hit_rate() - 1.0).abs() < 1e-9);
2161 }
2162
2163 #[test]
2164 fn g56_entity_embed_cache_populates_and_hits() {
2165 let cache = entity_embed_cache();
2169 let model = "test-model";
2170 let text = "sqlite-graphrag";
2171 let key = entity_cache_key(model, text);
2172 let stored = Arc::new(vec![0.42_f32; crate::constants::embedding_dim()]);
2173 cache.lock().insert(key, Arc::clone(&stored));
2174 let guard = cache.lock();
2175 let hit = guard.get(&key).expect("cache must return stored value");
2176 assert_eq!(hit.len(), crate::constants::embedding_dim());
2177 assert!((hit[0] - 0.42).abs() < 1e-6);
2178 }
2179
2180 #[test]
2181 fn g56_empty_texts_short_circuits_with_zero_stats() {
2182 let stats = EmbedCacheStats::default();
2185 assert_eq!(stats.requested, 0);
2186 assert_eq!(stats.hits, 0);
2187 assert_eq!(stats.misses, 0);
2188 assert_eq!(stats.hit_rate(), 0.0);
2189 }
2190}
2191
2192#[cfg(test)]
2196mod embed_with_fallback_tests {
2197 use super::*;
2198 use crate::llm::exit_code_hints::LlmBackendError;
2199
2200 #[test]
2201 fn none_backend_returns_empty_vector_without_calling_llm() {
2202 let (v, kind) = embed_via_backend(
2206 std::path::Path::new("/nonexistent"),
2207 "any text",
2208 &LlmBackendKind::None,
2209 )
2210 .expect("None backend never fails");
2211 assert!(v.is_empty());
2212 assert_eq!(kind, LlmBackendKind::None, "None backend must report None");
2213 }
2214
2215 #[test]
2216 fn empty_chain_defaults_to_codex_claude_none() {
2217 let defaults = [
2221 LlmBackendKind::Codex,
2222 LlmBackendKind::Claude,
2223 LlmBackendKind::None,
2224 ];
2225
2226 #[allow(dead_code)]
2231 fn llm_backend_kind_as_str_is_stable() {
2232 assert_eq!(LlmBackendKind::Codex.as_str(), "codex");
2233 assert_eq!(LlmBackendKind::Claude.as_str(), "claude");
2234 assert_eq!(LlmBackendKind::None.as_str(), "none");
2235 }
2236
2237 #[allow(dead_code)]
2238 fn fallback_reason_reason_code_is_stable() {
2239 assert_eq!(
2240 FallbackReason::EmbeddingFailed("any".into()).reason_code(),
2241 "embedding_failed"
2242 );
2243 assert_eq!(FallbackReason::Cancelled.reason_code(), "cancelled");
2244 assert_eq!(
2245 FallbackReason::Timeout {
2246 operation: "embed_query".into(),
2247 duration_secs: 30
2248 }
2249 .reason_code(),
2250 "timeout"
2251 );
2252 }
2253 assert_eq!(defaults.len(), 3);
2254 }
2255
2256 #[test]
2257 fn embed_with_fallback_chain_of_only_none_aborts_without_skip_on_failure_v1088() {
2258 let chain = vec![LlmBackendKind::None];
2270 let err = embed_with_fallback(
2271 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2272 "hello",
2273 &chain,
2274 false,
2275 )
2276 .expect_err("chain of only [None] without skip_on_failure MUST abort");
2277 let msg = format!("{err}");
2278 assert!(
2279 msg.contains("no LLM backends available"),
2280 "error must mention exhausted chain, got: {msg}"
2281 );
2282 }
2283 #[test]
2284 fn embed_with_fallback_skip_on_failure_with_only_none_returns_empty() {
2285 let chain = vec![LlmBackendKind::None];
2290 let v = embed_with_fallback(
2291 std::path::Path::new("/nonexistent-models-dir-for-gap005-test"),
2292 "hello",
2293 &chain,
2294 true,
2295 )
2296 .expect("None chain is always Ok");
2297 assert!(v.0.is_empty(), "vector must be empty");
2298 assert_eq!(v.1, LlmBackendKind::None);
2299 }
2300 #[allow(dead_code)]
2301 fn llm_backend_error_no_backends_default_message() {
2302 let e = LlmBackendError::NoBackendsAvailable;
2305 let h = e.hint();
2306 assert!(h.contains("--llm-fallback"));
2307 }
2308
2309 #[test]
2310 fn llm_backend_error_nonzero_exit_carries_stderr_tail() {
2311 let e = LlmBackendError::NonZeroExit {
2312 exit_code: Some(137),
2313 signal: Some(9),
2314 stdout_tail: "out".into(),
2315 stderr_tail: "OOM killed".into(),
2316 binary: "codex".into(),
2317 hint: "OOM".into(),
2318 };
2319 let s = e.to_string();
2320 assert!(s.contains("codex"));
2321 assert!(s.contains("OOM killed"));
2322 assert!(s.contains("signal 9") || s.contains("exit 137"));
2323 }
2324}