1use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40 AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::storage::schema::Value;
46use crate::storage::unified::entity::{EntityData, UnifiedEntity};
47
48pub const DEFAULT_ROW_CAP: usize = 20;
51
52#[derive(Debug, Clone, Default, PartialEq, Eq)]
54pub struct TokenSet {
55 pub keywords: Vec<String>,
58 pub literals: Vec<String>,
62}
63
64impl TokenSet {
65 pub fn is_empty(&self) -> bool {
66 self.keywords.is_empty() && self.literals.is_empty()
67 }
68}
69
70#[derive(Debug, Clone, Default)]
72pub struct CandidateCollections {
73 pub collections: Vec<String>,
76 pub columns_by_collection: HashMap<String, Vec<String>>,
80}
81
82#[derive(Debug, Clone)]
85pub struct VectorHit {
86 pub collection: String,
87 pub entity_id: u64,
88 pub score: f32,
89}
90
91#[derive(Debug, Clone)]
93pub struct FilteredRow {
94 pub collection: String,
95 pub entity: UnifiedEntity,
96 pub matched_literal: String,
98 pub matched_column: Option<String>,
100}
101
102#[derive(Debug, Clone, Default, PartialEq, Eq)]
104pub struct StageTimings {
105 pub extract_us: u64,
106 pub schema_us: u64,
107 pub vector_us: u64,
108 pub filter_us: u64,
109}
110
111#[derive(Debug, Clone, Default)]
115pub struct AskContext {
116 pub question: String,
117 pub tokens: TokenSet,
118 pub candidates: CandidateCollections,
119 pub vector_hits: Vec<VectorHit>,
120 pub filtered_rows: Vec<FilteredRow>,
121 pub timings: StageTimings,
122}
123
124pub enum AskPipeline {}
128
129impl AskPipeline {
130 pub fn execute(
132 runtime: &RedDBRuntime,
133 scope: &EffectiveScope,
134 question: &str,
135 ) -> RedDBResult<AskContext> {
136 Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
137 }
138
139 pub fn execute_with_limit(
141 runtime: &RedDBRuntime,
142 scope: &EffectiveScope,
143 question: &str,
144 row_cap: usize,
145 ) -> RedDBResult<AskContext> {
146 let span = info_span!(
147 "ask_pipeline.execute",
148 tenant = ?scope.effective_scope(),
149 question_len = question.len(),
150 row_cap = row_cap,
151 );
152 let _enter = span.enter();
153
154 let stage1 = Instant::now();
159 let tokens = extract_tokens_routed(runtime, scope, question)?;
160 let extract_us = stage1.elapsed().as_micros() as u64;
161 debug!(
162 target: "ask_pipeline",
163 stage = "extract_tokens",
164 keywords = ?tokens.keywords,
165 literals = ?tokens.literals,
166 elapsed_us = extract_us,
167 "stage 1 done"
168 );
169 if tokens.is_empty() {
170 warn!(
171 target: "ask_pipeline",
172 question_len = question.len(),
173 "refused: empty token set"
174 );
175 return Err(RedDBError::Query(
176 "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
177 .to_string(),
178 ));
179 }
180
181 let stage2 = Instant::now();
183 let candidates = match_schema(runtime, scope, &tokens)?;
184 let schema_us = stage2.elapsed().as_micros() as u64;
185 debug!(
186 target: "ask_pipeline",
187 stage = "match_schema",
188 collections = ?candidates.collections,
189 elapsed_us = schema_us,
190 "stage 2 done"
191 );
192
193 let stage3 = Instant::now();
195 let vector_hits = vector_search_scoped(runtime, scope, question, &candidates, row_cap);
196 let vector_us = stage3.elapsed().as_micros() as u64;
197 debug!(
198 target: "ask_pipeline",
199 stage = "vector_search_scoped",
200 hits = vector_hits.len(),
201 elapsed_us = vector_us,
202 "stage 3 done"
203 );
204
205 let stage4 = Instant::now();
207 let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
208 let filter_us = stage4.elapsed().as_micros() as u64;
209 debug!(
210 target: "ask_pipeline",
211 stage = "filter_values",
212 rows = filtered_rows.len(),
213 elapsed_us = filter_us,
214 "stage 4 done"
215 );
216
217 Ok(AskContext {
218 question: question.to_string(),
219 tokens,
220 candidates,
221 vector_hits,
222 filtered_rows,
223 timings: StageTimings {
224 extract_us,
225 schema_us,
226 vector_us,
227 filter_us,
228 },
229 })
230 }
231}
232
233fn extract_tokens_routed(
258 runtime: &RedDBRuntime,
259 scope: &EffectiveScope,
260 question: &str,
261) -> RedDBResult<TokenSet> {
262 let backend = runtime.config_string("ai.ner.backend", "heuristic");
263 if backend != "llm" {
264 return Ok(extract_tokens(question));
265 }
266
267 let endpoint = runtime.config_string("ai.ner.endpoint", "");
268 let model = runtime.config_string("ai.ner.model", "");
269 let timeout_ms = runtime
270 .config_string("ai.ner.timeout_ms", "5000")
271 .parse::<u32>()
272 .unwrap_or(5000);
273 let fallback = match runtime
274 .config_string("ai.ner.fallback", "use_heuristic")
275 .as_str()
276 {
277 "empty_on_fail" => HeuristicFallback::EmptyOnFail,
278 "propagate" => HeuristicFallback::Propagate,
279 _ => HeuristicFallback::UseHeuristic,
280 };
281
282 let provider = if endpoint.is_empty() && model.is_empty() {
285 NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
289 } else {
290 NerProvider::OpenAiCompat { endpoint, model }
291 };
292
293 let mut ner = LlmNer::new(provider, fallback);
294 ner.timeout_ms = timeout_ms;
295
296 let auth = ScopeAuthAdapter(scope);
297 let llm_result = match tokio::runtime::Handle::try_current() {
298 Ok(handle) => {
299 tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
304 }
305 Err(_) => {
306 warn!(
307 target: "ask_pipeline",
308 "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
309 );
310 return Ok(extract_tokens(question));
311 }
312 };
313
314 match llm_result {
315 Ok(tokens) => Ok(tokens),
316 Err(NerError::AuthDenied) => {
317 log_auth_denial_once();
318 apply_fallback(fallback, question)
323 }
324 Err(err) => {
325 warn!(
326 target: "ask_pipeline",
327 error = %err,
328 "LlmNer extract failed; honouring HeuristicFallback policy"
329 );
330 apply_fallback(fallback, question)
331 }
332 }
333}
334
335fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
336 match fallback {
337 HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
338 HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
339 HeuristicFallback::Propagate => Err(RedDBError::Query(
340 "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
341 )),
342 }
343}
344
345fn log_auth_denial_once() {
349 static EMITTED: AtomicBool = AtomicBool::new(false);
350 if !EMITTED.swap(true, Ordering::Relaxed) {
351 info!(
352 target: "ask_pipeline",
353 capability = NER_CAPABILITY,
354 "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
355 NER_CAPABILITY
356 );
357 }
358}
359
360struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
364
365impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
366 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367 f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
368 }
369}
370
371impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
372 fn has_capability(&self, capability: &str) -> bool {
373 self.0.has_capability(capability)
374 }
375}
376
377pub fn extract_tokens(question: &str) -> TokenSet {
388 let mut keywords: Vec<String> = Vec::new();
389 let mut literals: Vec<String> = Vec::new();
390
391 let mut chars = question.chars().peekable();
392 let mut buf = String::new();
393
394 let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
395 if buf.is_empty() {
396 return;
397 }
398 let word = std::mem::take(buf);
399 classify_token(&word, keywords, literals);
400 };
401
402 while let Some(ch) = chars.next() {
403 if ch.is_alphanumeric() || ch == '_' || ch == '-' {
404 buf.push(ch);
405 } else {
406 flush(&mut buf, &mut keywords, &mut literals);
407 let _ = ch;
409 }
410 if chars.peek().is_none() {
412 flush(&mut buf, &mut keywords, &mut literals);
413 }
414 }
415 if !buf.is_empty() {
418 classify_token(&buf, &mut keywords, &mut literals);
419 }
420
421 let mut seen = HashSet::new();
423 keywords.retain(|tok| seen.insert(tok.clone()));
424 let mut seen_lit = HashSet::new();
425 literals.retain(|tok| seen_lit.insert(tok.clone()));
426
427 TokenSet { keywords, literals }
428}
429
430fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
431 let is_upper_id_shape = word.len() >= 3
434 && word
435 .chars()
436 .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
437 && word.chars().any(|c| c.is_ascii_digit())
438 && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
439 let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
440 if is_upper_id_shape || is_long_digit_run {
441 literals.push(word.to_string());
442 return;
443 }
444 let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
447 if trimmed.len() < 2 {
448 return;
449 }
450 if !trimmed
451 .chars()
452 .next()
453 .map(|c| c.is_ascii_alphabetic())
454 .unwrap_or(false)
455 {
456 return;
457 }
458 if !trimmed
459 .chars()
460 .all(|c| c.is_ascii_alphanumeric() || c == '_')
461 {
462 return;
465 }
466 let lower = trimmed.to_ascii_lowercase();
467 if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
468 return;
469 }
470 keywords.push(lower);
471}
472
473const STOP_WORDS: &[&str] = &[
476 "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
477 "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
478 "where", "which", "with",
479];
480
481pub fn match_schema(
490 runtime: &RedDBRuntime,
491 scope: &EffectiveScope,
492 tokens: &TokenSet,
493) -> RedDBResult<CandidateCollections> {
494 let visible = match scope.visible_collections() {
495 Some(set) => set.clone(),
496 None => {
497 runtime
502 .inner
503 .db
504 .store()
505 .list_collections()
506 .into_iter()
507 .collect()
508 }
509 };
510
511 let mut collections: BTreeSet<String> = BTreeSet::new();
512 let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
513 for keyword in &tokens.keywords {
514 let hits = runtime.schema_vocabulary_lookup(keyword);
515 for hit in hits {
516 if !visible.contains(&hit.collection) {
517 continue;
518 }
519 collections.insert(hit.collection.clone());
520 if let Some(column) = hit.column {
521 columns_by_collection
522 .entry(hit.collection)
523 .or_default()
524 .insert(column);
525 }
526 }
527 }
528
529 Ok(CandidateCollections {
530 collections: collections.into_iter().collect(),
531 columns_by_collection: columns_by_collection
532 .into_iter()
533 .map(|(k, v)| (k, v.into_iter().collect()))
534 .collect(),
535 })
536}
537
538pub fn vector_search_scoped(
547 runtime: &RedDBRuntime,
548 scope: &EffectiveScope,
549 question: &str,
550 candidates: &CandidateCollections,
551 top_k: usize,
552) -> Vec<VectorHit> {
553 if candidates.collections.is_empty() {
554 return Vec::new();
555 }
556 let Some(embedding) = embed_question(runtime, question) else {
557 return Vec::new();
558 };
559 let per_collection = top_k.max(1);
560 let mut hits: Vec<VectorHit> = Vec::new();
561 for collection in &candidates.collections {
562 match super::authorized_search::AuthorizedSearch::execute_similar(
563 runtime,
564 scope,
565 collection,
566 &embedding,
567 per_collection,
568 0.0,
569 ) {
570 Ok(results) => {
571 for result in results {
572 hits.push(VectorHit {
573 collection: collection.clone(),
574 entity_id: result.entity_id.raw(),
575 score: result.score,
576 });
577 }
578 }
579 Err(err) => {
580 debug!(
581 target: "ask_pipeline",
582 collection = collection,
583 err = %err,
584 "vector_search_scoped: collection skipped"
585 );
586 }
587 }
588 }
589 hits.sort_by(|a, b| {
590 b.score
591 .partial_cmp(&a.score)
592 .unwrap_or(std::cmp::Ordering::Equal)
593 .then_with(|| a.entity_id.cmp(&b.entity_id))
594 });
595 hits.truncate(top_k);
596 hits
597}
598
599fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
603 let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
604 match runtime.inner.db.get_kv("red_config", key) {
605 Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
606 Some(_) => Ok(None),
607 None => Ok(None),
608 }
609 };
610 let provider = crate::ai::resolve_default_provider(&kv_getter);
611 if !provider.is_openai_compatible() {
612 return None;
613 }
614 let model = crate::ai::resolve_default_model(&provider, &kv_getter);
615 let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
616 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
617 let request = crate::ai::OpenAiEmbeddingRequest {
618 api_key,
619 model,
620 inputs: vec![question.to_string()],
621 dimensions: None,
622 api_base: provider.resolve_api_base(),
623 };
624 let response = crate::runtime::ai::block_on_ai(async move {
625 crate::ai::openai_embeddings_async(&transport, request).await
626 })
627 .and_then(|result| result)
628 .ok()?;
629 response.embeddings.into_iter().next()
630}
631
632pub fn filter_values(
641 runtime: &RedDBRuntime,
642 scope: &EffectiveScope,
643 candidates: &CandidateCollections,
644 tokens: &TokenSet,
645 row_cap: usize,
646) -> Vec<FilteredRow> {
647 if tokens.literals.is_empty() || candidates.collections.is_empty() {
648 return Vec::new();
649 }
650 let visible = scope.visible_collections();
651 let store = runtime.inner.db.store();
652 let mut out: Vec<FilteredRow> = Vec::new();
653
654 'collection: for collection in &candidates.collections {
655 if let Some(set) = visible {
659 if !set.contains(collection) {
660 continue;
661 }
662 }
663 let Some(manager) = store.get_collection(collection) else {
664 continue;
665 };
666 let hint_columns: &[String] = candidates
667 .columns_by_collection
668 .get(collection)
669 .map(|v| v.as_slice())
670 .unwrap_or(&[]);
671
672 for entity in manager.query_all(|_| true) {
673 if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
674 out.push(FilteredRow {
675 collection: collection.clone(),
676 entity,
677 matched_literal: hit.0,
678 matched_column: hit.1,
679 });
680 if out.len() >= row_cap {
681 break 'collection;
682 }
683 }
684 }
685 }
686 out
687}
688
689fn literal_match_in_entity(
692 entity: &UnifiedEntity,
693 literals: &[String],
694 hint_columns: &[String],
695) -> Option<(String, Option<String>)> {
696 let row = match &entity.data {
697 EntityData::Row(row) => row,
698 _ => return None,
699 };
700
701 for column in hint_columns {
703 if let Some(value) = row.get_field(column) {
704 if let Some(lit) = first_literal_in_value(value, literals) {
705 return Some((lit, Some(column.clone())));
706 }
707 }
708 }
709 for (name, value) in row.iter_fields() {
711 if hint_columns.iter().any(|c| c == name) {
712 continue;
713 }
714 if let Some(lit) = first_literal_in_value(value, literals) {
715 return Some((lit, Some(name.to_string())));
716 }
717 }
718 None
719}
720
721fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
722 let rendered = match value {
723 Value::Text(s) => s.to_string(),
724 Value::Integer(i) => i.to_string(),
725 Value::Float(f) => f.to_string(),
726 Value::Boolean(b) => b.to_string(),
727 Value::Json(j) => String::from_utf8_lossy(j).to_string(),
728 _ => return None,
729 };
730 for lit in literals {
731 if rendered.contains(lit) {
735 return Some(lit.clone());
736 }
737 }
738 None
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[test]
748 fn extract_tokens_splits_keywords_and_literals() {
749 let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
750 assert!(tokens.keywords.contains(&"novidades".to_string()));
753 assert!(tokens.keywords.contains(&"passport".to_string()));
754 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
755 assert!(!tokens.is_empty());
756 }
757
758 #[test]
759 fn extract_tokens_returns_empty_for_punctuation_only() {
760 let tokens = extract_tokens("??? ...");
761 assert!(tokens.is_empty());
762 }
763
764 #[test]
765 fn extract_tokens_long_digit_run_is_a_literal() {
766 let tokens = extract_tokens("show order 987654321 details");
767 assert!(tokens.literals.contains(&"987654321".to_string()));
768 assert!(tokens.keywords.contains(&"order".to_string()));
769 assert!(tokens.keywords.contains(&"details".to_string()));
770 assert!(tokens.keywords.contains(&"show".to_string()));
771 }
772
773 #[test]
774 fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
775 let tokens = extract_tokens("USA exports report");
778 assert!(tokens.keywords.contains(&"usa".to_string()));
779 assert!(tokens.literals.is_empty());
780 }
781
782 #[test]
783 fn extract_tokens_dedups() {
784 let tokens = extract_tokens("passport passport FDD-1 FDD-1");
785 assert_eq!(
786 tokens.keywords.iter().filter(|k| *k == "passport").count(),
787 1
788 );
789 assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
790 }
791
792 #[test]
795 fn first_literal_in_value_substring_match() {
796 let lit = first_literal_in_value(
797 &Value::text("issue FDD-12313 reported by user"),
798 &["FDD-12313".to_string()],
799 );
800 assert_eq!(lit.as_deref(), Some("FDD-12313"));
801 }
802
803 #[test]
804 fn first_literal_in_value_no_match_returns_none() {
805 assert!(
806 first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
807 .is_none()
808 );
809 }
810
811 use crate::api::RedDBOptions;
814 use crate::auth::Role;
815 use crate::runtime::statement_frame::EffectiveScope;
816 use crate::runtime::RedDBRuntime;
817 use crate::storage::transaction::snapshot::Snapshot;
818
819 fn make_scope(visible: HashSet<String>) -> EffectiveScope {
820 EffectiveScope {
821 tenant: Some("acme".to_string()),
822 identity: Some(("alice".to_string(), Role::Read)),
823 snapshot: Snapshot {
824 xid: 0,
825 in_progress: HashSet::new(),
826 },
827 visible_collections: Some(visible),
828 }
829 }
830
831 fn fresh_runtime() -> RedDBRuntime {
832 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
833 }
834
835 #[test]
838 fn execute_refuses_empty_token_set() {
839 let rt = fresh_runtime();
840 let scope = make_scope(HashSet::new());
841 let err = AskPipeline::execute(&rt, &scope, "??? ...")
842 .expect_err("empty token set must short-circuit");
843 let msg = format!("{err}");
844 assert!(
845 msg.contains("yielded no usable tokens"),
846 "expected structured empty-token error, got: {msg}"
847 );
848 }
849
850 #[test]
855 fn match_schema_intersects_with_visible_set() {
856 let rt = fresh_runtime();
857 rt.schema_vocabulary_apply(
861 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
862 collection: "travel".to_string(),
863 columns: vec!["id".into(), "passport".into()],
864 type_tags: Vec::new(),
865 description: None,
866 },
867 );
868 rt.schema_vocabulary_apply(
869 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
870 collection: "secrets".to_string(),
871 columns: vec!["passport".into()],
872 type_tags: Vec::new(),
873 description: None,
874 },
875 );
876 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
877 let scope = make_scope(visible.clone());
878 let tokens = TokenSet {
879 keywords: vec!["passport".to_string()],
880 literals: Vec::new(),
881 };
882 let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
883 assert_eq!(candidates.collections, vec!["travel".to_string()]);
884 assert!(!candidates.collections.contains(&"secrets".to_string()));
885 let cols = candidates
887 .columns_by_collection
888 .get("travel")
889 .expect("hint columns");
890 assert!(cols.contains(&"passport".to_string()));
891 }
892
893 use proptest::prelude::*;
901
902 fn arb_collection() -> impl Strategy<Value = String> {
903 "[a-z]{1,4}"
904 }
905
906 fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
907 prop::collection::hash_set(arb_collection(), 0..6)
908 }
909
910 fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
911 prop::collection::vec(arb_collection(), 0..8)
912 }
913
914 proptest! {
915 #![proptest_config(ProptestConfig::with_cases(256))]
916 #[test]
917 fn stage4_rows_subset_of_visible_collections(
918 visible in arb_visible(),
919 candidate_names in arb_candidates(),
920 literal_count in 0usize..3,
921 ) {
922 let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
928 let candidates = CandidateCollections {
929 collections: candidate_names,
930 columns_by_collection: HashMap::new(),
931 };
932 let literals: Vec<String> = (0..literal_count)
933 .map(|i| format!("ID-{i}"))
934 .collect();
935 let tokens = TokenSet {
936 keywords: vec!["passport".to_string()],
937 literals,
938 };
939 let scope = make_scope(visible.clone());
940 let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
941 for row in &rows {
942 prop_assert!(
943 visible.contains(&row.collection),
944 "Stage 4 leaked row collection={} not in visible={:?}",
945 row.collection, visible
946 );
947 }
948 }
949 }
950
951 static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
952
953 #[test]
966 fn integration_passport_fdd_12313_funnels_through_four_stages() {
967 let rt = fresh_runtime();
968 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
972 .expect("CREATE TABLE travel");
973 rt.execute_query(
974 "INSERT INTO travel (id, passport, notes) VALUES \
975 (1, 'BR-001', 'unrelated note'), \
976 (2, 'PT-002', 'incident FDD-12313 escalated'), \
977 (3, 'US-003', 'standard renewal')",
978 )
979 .expect("seed rows");
980 rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
982 .expect("CREATE TABLE secrets");
983 rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
984 .expect("seed secrets");
985
986 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
987 let scope = make_scope(visible);
988
989 let ctx = AskPipeline::execute(
990 &rt,
991 &scope,
992 "quais as novidades sobre o passport FDD-12313?",
993 )
994 .expect("pipeline runs");
995
996 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
998 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
999
1000 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1004
1005 let _ = &ctx.vector_hits;
1009
1010 assert!(
1013 ctx.filtered_rows
1014 .iter()
1015 .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1016 "expected travel row with FDD-12313 match, got: {:?}",
1017 ctx.filtered_rows
1018 );
1019 for row in &ctx.filtered_rows {
1020 assert_ne!(
1021 row.collection, "secrets",
1022 "secrets row leaked into Stage 4 output"
1023 );
1024 }
1025
1026 let _ = ctx.timings.extract_us
1030 + ctx.timings.schema_us
1031 + ctx.timings.vector_us
1032 + ctx.timings.filter_us;
1033 }
1034
1035 fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1049 let store = rt.inner.db.store();
1050 store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1051 }
1052
1053 #[test]
1056 fn routed_default_backend_runs_heuristic() {
1057 let rt = fresh_runtime();
1058 let scope = make_scope(HashSet::new());
1059 let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1060 .expect("heuristic path is infallible");
1061 assert!(tokens.keywords.contains(&"passport".to_string()));
1062 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1063 }
1064
1065 #[tokio::test(flavor = "multi_thread")]
1068 async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1069 let rt = fresh_runtime();
1070 write_config(&rt, "ai.ner.backend", "llm");
1071 write_config(&rt, "ai.ner.fallback", "use_heuristic");
1072 let scope = make_scope(HashSet::new());
1073 let tokens = tokio::task::spawn_blocking(move || {
1074 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1075 })
1076 .await
1077 .unwrap()
1078 .expect("fallback policy keeps the call OK");
1079 assert!(tokens.keywords.contains(&"passport".to_string()));
1080 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1081 }
1082
1083 #[tokio::test(flavor = "multi_thread")]
1086 async fn routed_llm_auth_denied_empty_on_fail() {
1087 let rt = fresh_runtime();
1088 write_config(&rt, "ai.ner.backend", "llm");
1089 write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1090 let scope = make_scope(HashSet::new());
1091 let tokens = tokio::task::spawn_blocking(move || {
1092 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1093 })
1094 .await
1095 .unwrap()
1096 .expect("empty_on_fail returns Ok with empty TokenSet");
1097 assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1098 }
1099
1100 #[tokio::test(flavor = "multi_thread")]
1104 async fn routed_llm_auth_denied_propagate_returns_error() {
1105 let rt = fresh_runtime();
1106 write_config(&rt, "ai.ner.backend", "llm");
1107 write_config(&rt, "ai.ner.fallback", "propagate");
1108 let scope = make_scope(HashSet::new());
1109 let err = tokio::task::spawn_blocking(move || {
1110 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1111 })
1112 .await
1113 .unwrap()
1114 .expect_err("propagate must surface the error");
1115 let msg = format!("{err}");
1116 assert!(
1117 msg.contains("propagate") || msg.contains("ai.ner.backend"),
1118 "expected propagate error message, got: {msg}"
1119 );
1120 }
1121
1122 #[tokio::test(flavor = "multi_thread")]
1127 async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1128 let rt = fresh_runtime();
1129 write_config(&rt, "ai.ner.backend", "llm");
1130 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1132 .expect("CREATE TABLE travel");
1133 rt.execute_query(
1134 "INSERT INTO travel (id, passport, notes) VALUES \
1135 (2, 'PT-002', 'incident FDD-12313 escalated')",
1136 )
1137 .expect("seed rows");
1138 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1139 let scope = make_scope(visible);
1140 let ctx = tokio::task::spawn_blocking(move || {
1141 AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1142 })
1143 .await
1144 .unwrap()
1145 .expect("pipeline runs");
1146 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1147 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1148 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1149 assert!(
1150 ctx.filtered_rows
1151 .iter()
1152 .any(|r| r.matched_literal == "FDD-12313"),
1153 "Stage 4 still runs after Stage 1 fallback"
1154 );
1155 }
1156}