1use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40 AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::application::SearchContextInput;
46use crate::storage::schema::Value;
47use crate::storage::unified::entity::{EntityData, EntityKind, UnifiedEntity};
48
49pub const DEFAULT_ROW_CAP: usize = 20;
52
53#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TokenSet {
56 pub keywords: Vec<String>,
59 pub literals: Vec<String>,
63}
64
65impl TokenSet {
66 pub fn is_empty(&self) -> bool {
67 self.keywords.is_empty() && self.literals.is_empty()
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct CandidateCollections {
74 pub collections: Vec<String>,
77 pub columns_by_collection: HashMap<String, Vec<String>>,
81}
82
83#[derive(Debug, Clone)]
85pub struct TextHit {
86 pub collection: String,
87 pub entity_id: u64,
88 pub score: f32,
89}
90
91#[derive(Debug, Clone)]
94pub struct VectorHit {
95 pub collection: String,
96 pub entity_id: u64,
97 pub score: f32,
98}
99
100#[derive(Debug, Clone)]
102pub struct GraphHit {
103 pub collection: String,
104 pub entity_id: u64,
105 pub score: f32,
106 pub depth: usize,
107 pub kind: GraphHitKind,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum GraphHitKind {
112 Node,
113 Edge,
114}
115
116#[derive(Debug, Clone)]
118pub struct FilteredRow {
119 pub collection: String,
120 pub entity: UnifiedEntity,
121 pub matched_literal: String,
123 pub matched_column: Option<String>,
125}
126
127#[derive(Debug, Clone, Default, PartialEq, Eq)]
129pub struct StageTimings {
130 pub extract_us: u64,
131 pub schema_us: u64,
132 pub text_us: u64,
133 pub vector_us: u64,
134 pub graph_us: u64,
135 pub filter_us: u64,
136}
137
138#[derive(Debug, Clone)]
142pub struct AskContext {
143 pub question: String,
144 pub tokens: TokenSet,
145 pub candidates: CandidateCollections,
146 pub text_hits: Vec<TextHit>,
147 pub vector_hits: Vec<VectorHit>,
148 pub graph_hits: Vec<GraphHit>,
149 pub filtered_rows: Vec<FilteredRow>,
150 pub source_limit: usize,
151 pub timings: StageTimings,
152}
153
154impl Default for AskContext {
155 fn default() -> Self {
156 Self {
157 question: String::new(),
158 tokens: TokenSet::default(),
159 candidates: CandidateCollections::default(),
160 text_hits: Vec::new(),
161 vector_hits: Vec::new(),
162 graph_hits: Vec::new(),
163 filtered_rows: Vec::new(),
164 source_limit: DEFAULT_ROW_CAP,
165 timings: StageTimings::default(),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum FusedSourceRef {
173 FilteredRow(usize),
174 TextHit(usize),
175 VectorHit(usize),
176 GraphHit(usize),
177}
178
179#[derive(Debug, Clone, Copy, PartialEq)]
181pub struct FusedSource {
182 pub source: FusedSourceRef,
183 pub rrf_score: f64,
184}
185
186pub enum AskPipeline {}
190
191impl AskPipeline {
192 pub fn execute(
194 runtime: &RedDBRuntime,
195 scope: &EffectiveScope,
196 question: &str,
197 ) -> RedDBResult<AskContext> {
198 Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
199 }
200
201 pub fn execute_with_limit(
203 runtime: &RedDBRuntime,
204 scope: &EffectiveScope,
205 question: &str,
206 row_cap: usize,
207 ) -> RedDBResult<AskContext> {
208 Self::execute_with_limit_and_min_score(runtime, scope, question, row_cap, None, None)
209 }
210
211 pub fn execute_with_limit_and_min_score(
214 runtime: &RedDBRuntime,
215 scope: &EffectiveScope,
216 question: &str,
217 row_cap: usize,
218 min_score: Option<f32>,
219 graph_depth: Option<usize>,
220 ) -> RedDBResult<AskContext> {
221 let span = info_span!(
222 "ask_pipeline.execute",
223 tenant = ?scope.effective_scope(),
224 question_len = question.len(),
225 row_cap = row_cap,
226 min_score = ?min_score,
227 graph_depth = ?graph_depth,
228 );
229 let _enter = span.enter();
230
231 let stage1 = Instant::now();
236 let tokens = extract_tokens_routed(runtime, scope, question)?;
237 let extract_us = stage1.elapsed().as_micros() as u64;
238 debug!(
239 target: "ask_pipeline",
240 stage = "extract_tokens",
241 keywords = ?tokens.keywords,
242 literals = ?tokens.literals,
243 elapsed_us = extract_us,
244 "stage 1 done"
245 );
246 if tokens.is_empty() {
247 warn!(
248 target: "ask_pipeline",
249 question_len = question.len(),
250 "refused: empty token set"
251 );
252 return Err(RedDBError::Query(
253 "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
254 .to_string(),
255 ));
256 }
257
258 let stage2 = Instant::now();
260 let candidates = match_schema(runtime, scope, &tokens)?;
261 let schema_us = stage2.elapsed().as_micros() as u64;
262 debug!(
263 target: "ask_pipeline",
264 stage = "match_schema",
265 collections = ?candidates.collections,
266 elapsed_us = schema_us,
267 "stage 2 done"
268 );
269
270 let stage3 = Instant::now();
272 let text_hits = text_search_bm25_scoped(runtime, scope, question, &candidates, row_cap);
273 let text_us = stage3.elapsed().as_micros() as u64;
274 debug!(
275 target: "ask_pipeline",
276 stage = "text_search_bm25_scoped",
277 hits = text_hits.len(),
278 elapsed_us = text_us,
279 "stage 3 done"
280 );
281
282 let stage3b = Instant::now();
284 let vector_hits =
285 vector_search_scoped(runtime, scope, question, &candidates, row_cap, min_score);
286 let vector_us = stage3b.elapsed().as_micros() as u64;
287 debug!(
288 target: "ask_pipeline",
289 stage = "vector_search_scoped",
290 hits = vector_hits.len(),
291 elapsed_us = vector_us,
292 "stage 3b done"
293 );
294
295 let stage3c = Instant::now();
297 let graph_hits = graph_search_scoped(
298 runtime,
299 scope,
300 question,
301 &candidates,
302 row_cap,
303 min_score,
304 graph_depth,
305 );
306 let graph_us = stage3c.elapsed().as_micros() as u64;
307 debug!(
308 target: "ask_pipeline",
309 stage = "graph_search_scoped",
310 hits = graph_hits.len(),
311 elapsed_us = graph_us,
312 "stage 3c done"
313 );
314
315 let stage4 = Instant::now();
317 let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
318 let filter_us = stage4.elapsed().as_micros() as u64;
319 debug!(
320 target: "ask_pipeline",
321 stage = "filter_values",
322 rows = filtered_rows.len(),
323 elapsed_us = filter_us,
324 "stage 4 done"
325 );
326
327 Ok(AskContext {
328 question: question.to_string(),
329 tokens,
330 candidates,
331 text_hits,
332 vector_hits,
333 graph_hits,
334 filtered_rows,
335 source_limit: row_cap,
336 timings: StageTimings {
337 extract_us,
338 schema_us,
339 text_us,
340 vector_us,
341 graph_us,
342 filter_us,
343 },
344 })
345 }
346}
347
348pub fn fused_source_order(ctx: &AskContext) -> Vec<FusedSourceRef> {
351 fused_sources(ctx)
352 .into_iter()
353 .map(|fused| fused.source)
354 .collect()
355}
356
357pub fn fused_sources(ctx: &AskContext) -> Vec<FusedSource> {
360 use super::ai::rrf_fuser::{fuse, Bucket, Candidate, RRF_K_DEFAULT};
361
362 if ctx.source_limit == 0
363 || (ctx.filtered_rows.is_empty()
364 && ctx.text_hits.is_empty()
365 && ctx.vector_hits.is_empty()
366 && ctx.graph_hits.is_empty())
367 {
368 return Vec::new();
369 }
370
371 let mut refs: HashMap<String, FusedSourceRef> = HashMap::new();
372 let row_bucket = Bucket {
373 candidates: ctx
374 .filtered_rows
375 .iter()
376 .enumerate()
377 .map(|(idx, row)| {
378 let id = source_identity(&row.collection, row.entity.id.raw());
379 refs.entry(id.clone())
380 .or_insert(FusedSourceRef::FilteredRow(idx));
381 Candidate { id, score: 1.0 }
382 })
383 .collect(),
384 min_score: None,
385 };
386 let text_bucket = Bucket {
387 candidates: ctx
388 .text_hits
389 .iter()
390 .enumerate()
391 .map(|(idx, hit)| {
392 let id = source_identity(&hit.collection, hit.entity_id);
393 refs.entry(id.clone())
394 .or_insert(FusedSourceRef::TextHit(idx));
395 Candidate {
396 id,
397 score: hit.score as f64,
398 }
399 })
400 .collect(),
401 min_score: None,
402 };
403 let vector_bucket = Bucket {
404 candidates: ctx
405 .vector_hits
406 .iter()
407 .enumerate()
408 .map(|(idx, hit)| {
409 let id = source_identity(&hit.collection, hit.entity_id);
410 refs.entry(id.clone())
411 .or_insert(FusedSourceRef::VectorHit(idx));
412 Candidate {
413 id,
414 score: hit.score as f64,
415 }
416 })
417 .collect(),
418 min_score: None,
419 };
420 let graph_bucket = Bucket {
421 candidates: ctx
422 .graph_hits
423 .iter()
424 .enumerate()
425 .map(|(idx, hit)| {
426 let id = source_identity(&hit.collection, hit.entity_id);
427 refs.entry(id.clone())
428 .or_insert(FusedSourceRef::GraphHit(idx));
429 Candidate {
430 id,
431 score: hit.score as f64,
432 }
433 })
434 .collect(),
435 min_score: None,
436 };
437
438 fuse(
439 &[row_bucket, text_bucket, vector_bucket, graph_bucket],
440 RRF_K_DEFAULT,
441 ctx.source_limit,
442 )
443 .into_iter()
444 .filter_map(|item| {
445 refs.get(&item.id).copied().map(|source| FusedSource {
446 source,
447 rrf_score: item.rrf_score,
448 })
449 })
450 .collect()
451}
452
453fn source_identity(collection: &str, entity_id: u64) -> String {
454 format!("{collection}/{entity_id}")
455}
456
457pub fn text_search_bm25_scoped(
465 runtime: &RedDBRuntime,
466 scope: &EffectiveScope,
467 question: &str,
468 candidates: &CandidateCollections,
469 top_k: usize,
470) -> Vec<TextHit> {
471 if candidates.collections.is_empty() || top_k == 0 {
472 return Vec::new();
473 }
474
475 let visible = scope.visible_collections();
476 let allowed: BTreeSet<String> = candidates
477 .collections
478 .iter()
479 .filter(|collection| visible.is_none_or(|set| set.contains(*collection)))
480 .cloned()
481 .collect();
482 if allowed.is_empty() {
483 return Vec::new();
484 }
485
486 let _scope_guard = AskScopeGuard::install(scope);
487 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
488 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
489 let store = runtime.inner.db.store();
490
491 runtime
492 .inner
493 .db
494 .store()
495 .context_index()
496 .search_bm25(question, top_k, Some(&allowed))
497 .into_iter()
498 .filter_map(|hit| {
499 let entity = store.get(&hit.collection, hit.entity_id)?;
500 if !ask_entity_allowed(
501 runtime,
502 scope,
503 &hit.collection,
504 &entity,
505 snap_ctx.as_ref(),
506 &mut rls_cache,
507 ) {
508 return None;
509 }
510 Some(TextHit {
511 collection: hit.collection,
512 entity_id: hit.entity_id.raw(),
513 score: hit.score,
514 })
515 })
516 .collect()
517}
518
519fn extract_tokens_routed(
544 runtime: &RedDBRuntime,
545 scope: &EffectiveScope,
546 question: &str,
547) -> RedDBResult<TokenSet> {
548 let backend = runtime.config_string("ai.ner.backend", "heuristic");
549 if backend != "llm" {
550 return Ok(extract_tokens(question));
551 }
552
553 let endpoint = runtime.config_string("ai.ner.endpoint", "");
554 let model = runtime.config_string("ai.ner.model", "");
555 let timeout_ms = runtime
556 .config_string("ai.ner.timeout_ms", "5000")
557 .parse::<u32>()
558 .unwrap_or(5000);
559 let fallback = match runtime
560 .config_string("ai.ner.fallback", "use_heuristic")
561 .as_str()
562 {
563 "empty_on_fail" => HeuristicFallback::EmptyOnFail,
564 "propagate" => HeuristicFallback::Propagate,
565 _ => HeuristicFallback::UseHeuristic,
566 };
567
568 let provider = if endpoint.is_empty() && model.is_empty() {
571 NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
575 } else {
576 NerProvider::OpenAiCompat { endpoint, model }
577 };
578
579 let mut ner = LlmNer::new(provider, fallback);
580 ner.timeout_ms = timeout_ms;
581
582 let auth = ScopeAuthAdapter(scope);
583 let llm_result = match tokio::runtime::Handle::try_current() {
584 Ok(handle) => {
585 tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
590 }
591 Err(_) => {
592 warn!(
593 target: "ask_pipeline",
594 "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
595 );
596 return Ok(extract_tokens(question));
597 }
598 };
599
600 match llm_result {
601 Ok(tokens) => Ok(tokens),
602 Err(NerError::AuthDenied) => {
603 log_auth_denial_once();
604 apply_fallback(fallback, question)
609 }
610 Err(err) => {
611 warn!(
612 target: "ask_pipeline",
613 error = %err,
614 "LlmNer extract failed; honouring HeuristicFallback policy"
615 );
616 apply_fallback(fallback, question)
617 }
618 }
619}
620
621fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
622 match fallback {
623 HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
624 HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
625 HeuristicFallback::Propagate => Err(RedDBError::Query(
626 "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
627 )),
628 }
629}
630
631fn log_auth_denial_once() {
635 static EMITTED: AtomicBool = AtomicBool::new(false);
636 if !EMITTED.swap(true, Ordering::Relaxed) {
637 info!(
638 target: "ask_pipeline",
639 capability = NER_CAPABILITY,
640 "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
641 NER_CAPABILITY
642 );
643 }
644}
645
646struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
650
651impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
654 }
655}
656
657impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
658 fn has_capability(&self, capability: &str) -> bool {
659 self.0.has_capability(capability)
660 }
661}
662
663pub fn extract_tokens(question: &str) -> TokenSet {
674 let mut keywords: Vec<String> = Vec::new();
675 let mut literals: Vec<String> = Vec::new();
676
677 let mut chars = question.chars().peekable();
678 let mut buf = String::new();
679
680 let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
681 if buf.is_empty() {
682 return;
683 }
684 let word = std::mem::take(buf);
685 classify_token(&word, keywords, literals);
686 };
687
688 while let Some(ch) = chars.next() {
689 if ch.is_alphanumeric() || ch == '_' || ch == '-' {
690 buf.push(ch);
691 } else {
692 flush(&mut buf, &mut keywords, &mut literals);
693 let _ = ch;
695 }
696 if chars.peek().is_none() {
698 flush(&mut buf, &mut keywords, &mut literals);
699 }
700 }
701 if !buf.is_empty() {
704 classify_token(&buf, &mut keywords, &mut literals);
705 }
706
707 let mut seen = HashSet::new();
709 keywords.retain(|tok| seen.insert(tok.clone()));
710 let mut seen_lit = HashSet::new();
711 literals.retain(|tok| seen_lit.insert(tok.clone()));
712
713 TokenSet { keywords, literals }
714}
715
716fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
717 let is_upper_id_shape = word.len() >= 3
720 && word
721 .chars()
722 .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
723 && word.chars().any(|c| c.is_ascii_digit())
724 && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
725 let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
726 if is_upper_id_shape || is_long_digit_run {
727 literals.push(word.to_string());
728 return;
729 }
730 let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
733 if trimmed.len() < 2 {
734 return;
735 }
736 if !trimmed
737 .chars()
738 .next()
739 .map(|c| c.is_ascii_alphabetic())
740 .unwrap_or(false)
741 {
742 return;
743 }
744 if !trimmed
745 .chars()
746 .all(|c| c.is_ascii_alphanumeric() || c == '_')
747 {
748 return;
751 }
752 let lower = trimmed.to_ascii_lowercase();
753 if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
754 return;
755 }
756 keywords.push(lower);
757}
758
759const STOP_WORDS: &[&str] = &[
762 "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
763 "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
764 "where", "which", "with",
765];
766
767pub fn match_schema(
776 runtime: &RedDBRuntime,
777 scope: &EffectiveScope,
778 tokens: &TokenSet,
779) -> RedDBResult<CandidateCollections> {
780 let visible = match scope.visible_collections() {
781 Some(set) => set.clone(),
782 None => {
783 runtime
788 .inner
789 .db
790 .store()
791 .list_collections()
792 .into_iter()
793 .collect()
794 }
795 };
796
797 let mut collections: BTreeSet<String> = BTreeSet::new();
798 let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
799 for keyword in &tokens.keywords {
800 let hits = runtime.schema_vocabulary_lookup(keyword);
801 for hit in hits {
802 if !visible.contains(&hit.collection) {
803 continue;
804 }
805 collections.insert(hit.collection.clone());
806 if let Some(column) = hit.column {
807 columns_by_collection
808 .entry(hit.collection)
809 .or_default()
810 .insert(column);
811 }
812 }
813 }
814
815 Ok(CandidateCollections {
816 collections: collections.into_iter().collect(),
817 columns_by_collection: columns_by_collection
818 .into_iter()
819 .map(|(k, v)| (k, v.into_iter().collect()))
820 .collect(),
821 })
822}
823
824pub fn vector_search_scoped(
833 runtime: &RedDBRuntime,
834 scope: &EffectiveScope,
835 question: &str,
836 candidates: &CandidateCollections,
837 top_k: usize,
838 min_score: Option<f32>,
839) -> Vec<VectorHit> {
840 if candidates.collections.is_empty() {
841 return Vec::new();
842 }
843 let Some(embedding) = embed_question(runtime, question) else {
844 return Vec::new();
845 };
846 let per_collection = top_k.max(1);
847 let mut hits: Vec<VectorHit> = Vec::new();
848 let _scope_guard = AskScopeGuard::install(scope);
849 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
850 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
851 let store = runtime.inner.db.store();
852 for collection in &candidates.collections {
853 match super::authorized_search::AuthorizedSearch::execute_similar(
854 runtime,
855 scope,
856 collection,
857 &embedding,
858 per_collection,
859 min_score.unwrap_or(0.0),
860 ) {
861 Ok(results) => {
862 for result in results {
863 let Some(entity) = store.get(collection, result.entity_id) else {
864 continue;
865 };
866 if !ask_entity_allowed(
867 runtime,
868 scope,
869 collection,
870 &entity,
871 snap_ctx.as_ref(),
872 &mut rls_cache,
873 ) {
874 continue;
875 }
876 hits.push(VectorHit {
877 collection: collection.clone(),
878 entity_id: result.entity_id.raw(),
879 score: result.score,
880 });
881 }
882 }
883 Err(err) => {
884 debug!(
885 target: "ask_pipeline",
886 collection = collection,
887 err = %err,
888 "vector_search_scoped: collection skipped"
889 );
890 }
891 }
892 }
893 hits.sort_by(|a, b| {
894 b.score
895 .partial_cmp(&a.score)
896 .unwrap_or(std::cmp::Ordering::Equal)
897 .then_with(|| a.entity_id.cmp(&b.entity_id))
898 });
899 hits.truncate(top_k);
900 hits
901}
902
903pub fn graph_search_scoped(
911 runtime: &RedDBRuntime,
912 scope: &EffectiveScope,
913 question: &str,
914 candidates: &CandidateCollections,
915 top_k: usize,
916 min_score: Option<f32>,
917 graph_depth: Option<usize>,
918) -> Vec<GraphHit> {
919 if candidates.collections.is_empty() || top_k == 0 {
920 return Vec::new();
921 }
922 let depth = graph_depth
923 .unwrap_or(crate::runtime::ai::mcp_ask_tool::DEPTH_DEFAULT as usize)
924 .max(1);
925 let _scope_guard = AskScopeGuard::install(scope);
926 let input = SearchContextInput {
927 query: question.to_string(),
928 field: None,
929 vector: None,
930 collections: Some(candidates.collections.clone()),
931 graph_depth: Some(depth),
932 graph_max_edges: None,
933 max_cross_refs: Some(0),
934 follow_cross_refs: Some(false),
935 expand_graph: Some(true),
936 global_scan: Some(true),
937 reindex: Some(false),
938 limit: Some(top_k),
939 min_score,
940 };
941 let result = match if scope.visible_collections().is_some() {
942 super::authorized_search::AuthorizedSearch::execute_context(runtime, scope, input)
943 } else {
944 runtime.search_context(input)
945 } {
946 Ok(result) => result,
947 Err(err) => {
948 debug!(
949 target: "ask_pipeline",
950 err = %err,
951 "graph_search_scoped: context search skipped"
952 );
953 return Vec::new();
954 }
955 };
956
957 let mut hits = Vec::new();
958 for entity in result
959 .graph
960 .nodes
961 .into_iter()
962 .chain(result.graph.edges.into_iter())
963 {
964 let crate::runtime::DiscoveryMethod::GraphTraversal { depth, .. } = entity.discovery else {
965 continue;
966 };
967 let kind = match entity.entity.kind {
968 EntityKind::GraphNode(_) => GraphHitKind::Node,
969 EntityKind::GraphEdge(_) => GraphHitKind::Edge,
970 _ => continue,
971 };
972 hits.push(GraphHit {
973 collection: entity.collection,
974 entity_id: entity.entity.id.raw(),
975 score: entity.score,
976 depth,
977 kind,
978 });
979 }
980 hits.sort_by(|a, b| {
981 b.score
982 .partial_cmp(&a.score)
983 .unwrap_or(std::cmp::Ordering::Equal)
984 .then_with(|| a.depth.cmp(&b.depth))
985 .then_with(|| a.collection.cmp(&b.collection))
986 .then_with(|| a.entity_id.cmp(&b.entity_id))
987 });
988 hits.truncate(top_k);
989 hits
990}
991
992fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
996 let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
997 match runtime.inner.db.get_kv("red_config", key) {
998 Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
999 Some(_) => Ok(None),
1000 None => Ok(None),
1001 }
1002 };
1003 let provider = crate::ai::resolve_default_provider(&kv_getter);
1004 if !provider.is_openai_compatible() {
1005 return None;
1006 }
1007 let model = crate::ai::resolve_default_model(&provider, &kv_getter);
1008 let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
1009 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
1010 let request = crate::ai::OpenAiEmbeddingRequest {
1011 api_key,
1012 model,
1013 inputs: vec![question.to_string()],
1014 dimensions: None,
1015 api_base: provider.resolve_api_base(),
1016 };
1017 let response = crate::runtime::ai::block_on_ai(async move {
1018 crate::ai::openai_embeddings_async(&transport, request).await
1019 })
1020 .and_then(|result| result)
1021 .ok()?;
1022 response.embeddings.into_iter().next()
1023}
1024
1025pub fn filter_values(
1034 runtime: &RedDBRuntime,
1035 scope: &EffectiveScope,
1036 candidates: &CandidateCollections,
1037 tokens: &TokenSet,
1038 row_cap: usize,
1039) -> Vec<FilteredRow> {
1040 if tokens.literals.is_empty() || candidates.collections.is_empty() {
1041 return Vec::new();
1042 }
1043 let visible = scope.visible_collections();
1044 let store = runtime.inner.db.store();
1045 let mut out: Vec<FilteredRow> = Vec::new();
1046 let _scope_guard = AskScopeGuard::install(scope);
1047 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1048 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
1049
1050 'collection: for collection in &candidates.collections {
1051 if let Some(set) = visible {
1055 if !set.contains(collection) {
1056 continue;
1057 }
1058 }
1059 let Some(manager) = store.get_collection(collection) else {
1060 continue;
1061 };
1062 let hint_columns: &[String] = candidates
1063 .columns_by_collection
1064 .get(collection)
1065 .map(|v| v.as_slice())
1066 .unwrap_or(&[]);
1067
1068 for entity in manager.query_all(|_| true) {
1069 if !ask_entity_allowed(
1070 runtime,
1071 scope,
1072 collection,
1073 &entity,
1074 snap_ctx.as_ref(),
1075 &mut rls_cache,
1076 ) {
1077 continue;
1078 }
1079 if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
1080 out.push(FilteredRow {
1081 collection: collection.clone(),
1082 entity,
1083 matched_literal: hit.0,
1084 matched_column: hit.1,
1085 });
1086 if out.len() >= row_cap {
1087 break 'collection;
1088 }
1089 }
1090 }
1091 }
1092 out
1093}
1094
1095fn ask_entity_allowed(
1096 runtime: &RedDBRuntime,
1097 scope: &EffectiveScope,
1098 collection: &str,
1099 entity: &UnifiedEntity,
1100 snap_ctx: Option<&crate::runtime::impl_core::SnapshotContext>,
1101 rls_cache: &mut HashMap<String, Option<crate::storage::query::ast::Filter>>,
1102) -> bool {
1103 if scope
1104 .visible_collections()
1105 .is_some_and(|visible| !visible.contains(collection))
1106 {
1107 return false;
1108 }
1109 runtime.search_entity_allowed(collection, entity, snap_ctx, rls_cache)
1110}
1111
1112struct AskScopeGuard {
1113 prev_tenant: Option<String>,
1114 prev_auth: Option<(String, crate::auth::Role)>,
1115}
1116
1117impl AskScopeGuard {
1118 fn install(scope: &EffectiveScope) -> Self {
1119 let prev_tenant = crate::runtime::impl_core::current_tenant();
1120 let prev_auth = crate::runtime::impl_core::current_auth_identity();
1121
1122 match scope.effective_scope() {
1123 Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant.to_string()),
1124 None => crate::runtime::impl_core::clear_current_tenant(),
1125 }
1126 match scope.identity() {
1127 Some((user, role)) => {
1128 crate::runtime::impl_core::set_current_auth_identity(user.to_string(), role)
1129 }
1130 None => crate::runtime::impl_core::clear_current_auth_identity(),
1131 }
1132
1133 Self {
1134 prev_tenant,
1135 prev_auth,
1136 }
1137 }
1138}
1139
1140impl Drop for AskScopeGuard {
1141 fn drop(&mut self) {
1142 match self.prev_tenant.take() {
1143 Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant),
1144 None => crate::runtime::impl_core::clear_current_tenant(),
1145 }
1146 match self.prev_auth.take() {
1147 Some((user, role)) => crate::runtime::impl_core::set_current_auth_identity(user, role),
1148 None => crate::runtime::impl_core::clear_current_auth_identity(),
1149 }
1150 }
1151}
1152
1153fn literal_match_in_entity(
1156 entity: &UnifiedEntity,
1157 literals: &[String],
1158 hint_columns: &[String],
1159) -> Option<(String, Option<String>)> {
1160 let row = match &entity.data {
1161 EntityData::Row(row) => row,
1162 _ => return None,
1163 };
1164
1165 for column in hint_columns {
1167 if let Some(value) = row.get_field(column) {
1168 if let Some(lit) = first_literal_in_value(value, literals) {
1169 return Some((lit, Some(column.clone())));
1170 }
1171 }
1172 }
1173 for (name, value) in row.iter_fields() {
1175 if hint_columns.iter().any(|c| c == name) {
1176 continue;
1177 }
1178 if let Some(lit) = first_literal_in_value(value, literals) {
1179 return Some((lit, Some(name.to_string())));
1180 }
1181 }
1182 None
1183}
1184
1185fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
1186 let rendered = match value {
1187 Value::Text(s) => s.to_string(),
1188 Value::Integer(i) => i.to_string(),
1189 Value::Float(f) => f.to_string(),
1190 Value::Boolean(b) => b.to_string(),
1191 Value::Json(j) => String::from_utf8_lossy(j).to_string(),
1192 _ => return None,
1193 };
1194 for lit in literals {
1195 if rendered.contains(lit) {
1199 return Some(lit.clone());
1200 }
1201 }
1202 None
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use super::*;
1208
1209 #[test]
1212 fn extract_tokens_splits_keywords_and_literals() {
1213 let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
1214 assert!(tokens.keywords.contains(&"novidades".to_string()));
1217 assert!(tokens.keywords.contains(&"passport".to_string()));
1218 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1219 assert!(!tokens.is_empty());
1220 }
1221
1222 #[test]
1223 fn extract_tokens_returns_empty_for_punctuation_only() {
1224 let tokens = extract_tokens("??? ...");
1225 assert!(tokens.is_empty());
1226 }
1227
1228 #[test]
1229 fn extract_tokens_long_digit_run_is_a_literal() {
1230 let tokens = extract_tokens("show order 987654321 details");
1231 assert!(tokens.literals.contains(&"987654321".to_string()));
1232 assert!(tokens.keywords.contains(&"order".to_string()));
1233 assert!(tokens.keywords.contains(&"details".to_string()));
1234 assert!(tokens.keywords.contains(&"show".to_string()));
1235 }
1236
1237 #[test]
1238 fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
1239 let tokens = extract_tokens("USA exports report");
1242 assert!(tokens.keywords.contains(&"usa".to_string()));
1243 assert!(tokens.literals.is_empty());
1244 }
1245
1246 #[test]
1247 fn extract_tokens_dedups() {
1248 let tokens = extract_tokens("passport passport FDD-1 FDD-1");
1249 assert_eq!(
1250 tokens.keywords.iter().filter(|k| *k == "passport").count(),
1251 1
1252 );
1253 assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
1254 }
1255
1256 #[test]
1259 fn first_literal_in_value_substring_match() {
1260 let lit = first_literal_in_value(
1261 &Value::text("issue FDD-12313 reported by user"),
1262 &["FDD-12313".to_string()],
1263 );
1264 assert_eq!(lit.as_deref(), Some("FDD-12313"));
1265 }
1266
1267 #[test]
1268 fn first_literal_in_value_no_match_returns_none() {
1269 assert!(
1270 first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
1271 .is_none()
1272 );
1273 }
1274
1275 use crate::api::RedDBOptions;
1278 use crate::auth::Role;
1279 use crate::runtime::statement_frame::EffectiveScope;
1280 use crate::runtime::RedDBRuntime;
1281 use crate::storage::schema::Value;
1282 use crate::storage::transaction::snapshot::Snapshot;
1283 use crate::storage::unified::entity::{
1284 EntityData, EntityId, EntityKind, RowData, UnifiedEntity,
1285 };
1286 use std::sync::Arc;
1287
1288 fn make_scope(visible: HashSet<String>) -> EffectiveScope {
1289 EffectiveScope {
1290 tenant: Some("acme".to_string()),
1291 identity: Some(("alice".to_string(), Role::Read)),
1292 snapshot: Snapshot {
1293 xid: 0,
1294 in_progress: HashSet::new(),
1295 },
1296 visible_collections: Some(visible),
1297 }
1298 }
1299
1300 fn fresh_runtime() -> RedDBRuntime {
1301 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
1302 }
1303
1304 fn test_row(collection: &str, id: u64) -> FilteredRow {
1305 FilteredRow {
1306 collection: collection.to_string(),
1307 entity: UnifiedEntity::new(
1308 EntityId::new(id),
1309 EntityKind::TableRow {
1310 table: Arc::from(collection),
1311 row_id: id,
1312 },
1313 EntityData::Row(RowData {
1314 columns: Vec::new(),
1315 named: Some(
1316 [("body".to_string(), Value::text("ticket FDD-1".to_string()))]
1317 .into_iter()
1318 .collect(),
1319 ),
1320 schema: None,
1321 }),
1322 ),
1323 matched_literal: "FDD-1".to_string(),
1324 matched_column: Some("body".to_string()),
1325 }
1326 }
1327
1328 fn test_graph_hit(collection: &str, id: u64, score: f32, depth: usize) -> GraphHit {
1329 GraphHit {
1330 collection: collection.to_string(),
1331 entity_id: id,
1332 score,
1333 depth,
1334 kind: GraphHitKind::Node,
1335 }
1336 }
1337
1338 fn test_text_hit(collection: &str, id: u64, score: f32) -> TextHit {
1339 TextHit {
1340 collection: collection.to_string(),
1341 entity_id: id,
1342 score,
1343 }
1344 }
1345
1346 struct TenantGuard;
1347
1348 impl TenantGuard {
1349 fn set(tenant: &str) -> Self {
1350 crate::runtime::impl_core::set_current_tenant(tenant.to_string());
1351 Self
1352 }
1353 }
1354
1355 impl Drop for TenantGuard {
1356 fn drop(&mut self) {
1357 crate::runtime::impl_core::clear_current_tenant();
1358 }
1359 }
1360
1361 fn row_text<'a>(entity: &'a UnifiedEntity, field: &str) -> Option<&'a str> {
1362 let row = entity.data.as_row()?;
1363 match row.get_field(field)? {
1364 Value::Text(value) => Some(value.as_ref()),
1365 _ => None,
1366 }
1367 }
1368
1369 #[test]
1370 fn fused_source_order_uses_rrf_and_total_limit() {
1371 let ctx = AskContext {
1372 source_limit: 2,
1373 filtered_rows: vec![test_row("incidents", 2), test_row("incidents", 1)],
1374 vector_hits: vec![
1375 VectorHit {
1376 collection: "incidents".to_string(),
1377 entity_id: 1,
1378 score: 0.91,
1379 },
1380 VectorHit {
1381 collection: "docs".to_string(),
1382 entity_id: 9,
1383 score: 0.88,
1384 },
1385 ],
1386 ..AskContext::default()
1387 };
1388
1389 let order = fused_source_order(&ctx);
1390
1391 assert_eq!(
1392 order,
1393 vec![
1394 FusedSourceRef::FilteredRow(1),
1395 FusedSourceRef::FilteredRow(0)
1396 ]
1397 );
1398 }
1399
1400 #[test]
1401 fn fused_source_order_includes_graph_bucket() {
1402 let ctx = AskContext {
1403 source_limit: 4,
1404 filtered_rows: vec![test_row("incidents", 1)],
1405 text_hits: vec![test_text_hit("articles", 5, 1.2)],
1406 vector_hits: vec![
1407 VectorHit {
1408 collection: "incidents".to_string(),
1409 entity_id: 1,
1410 score: 0.91,
1411 },
1412 VectorHit {
1413 collection: "docs".to_string(),
1414 entity_id: 9,
1415 score: 0.88,
1416 },
1417 ],
1418 graph_hits: vec![test_graph_hit("topology", 7, 0.80, 1)],
1419 ..AskContext::default()
1420 };
1421
1422 let order = fused_source_order(&ctx);
1423
1424 assert_eq!(
1425 order,
1426 vec![
1427 FusedSourceRef::FilteredRow(0),
1428 FusedSourceRef::TextHit(0),
1429 FusedSourceRef::GraphHit(0),
1430 FusedSourceRef::VectorHit(1),
1431 ]
1432 );
1433 }
1434
1435 #[test]
1436 fn text_search_bm25_scoped_ranks_specific_document_first() {
1437 let rt = fresh_runtime();
1438 rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1439 .expect("create docs");
1440 rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1441 .expect("insert specific doc");
1442 rt.execute_query(
1443 "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1444 )
1445 .expect("insert broad doc");
1446
1447 let scope = make_scope(["docs".to_string()].into_iter().collect());
1448 let candidates = CandidateCollections {
1449 collections: vec!["docs".to_string()],
1450 columns_by_collection: HashMap::new(),
1451 };
1452 let hits = text_search_bm25_scoped(&rt, &scope, "passport renewal", &candidates, 10);
1453
1454 assert_eq!(hits.len(), 2);
1455 assert!(
1456 hits[0].score > hits[1].score,
1457 "BM25 text bucket should prefer the shorter exact match: {hits:?}"
1458 );
1459 }
1460
1461 #[test]
1462 fn text_search_bm25_scoped_filters_rls_denied_hits() {
1463 let rt = fresh_runtime();
1464 rt.execute_query(
1465 "CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT) WITH CONTEXT INDEX ON (body)",
1466 )
1467 .expect("create docs");
1468 rt.execute_query(
1469 "INSERT INTO docs (id, tenant_id, body) VALUES \
1470 (1, 'acme', 'shared launch plan'), \
1471 (2, 'globex', 'shared launch plan')",
1472 )
1473 .expect("seed docs");
1474 rt.execute_query(
1475 "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1476 )
1477 .expect("create policy");
1478 rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1479 .expect("enable rls");
1480
1481 let _tenant = TenantGuard::set("acme");
1482 let scope = make_scope(["docs".to_string()].into_iter().collect());
1483 let candidates = CandidateCollections {
1484 collections: vec!["docs".to_string()],
1485 columns_by_collection: HashMap::new(),
1486 };
1487
1488 let hits = text_search_bm25_scoped(&rt, &scope, "shared launch", &candidates, 10);
1489
1490 assert_eq!(hits.len(), 1, "RLS should hide the globex hit: {hits:?}");
1491 let entity = rt
1492 .inner
1493 .db
1494 .store()
1495 .get(
1496 "docs",
1497 crate::storage::unified::entity::EntityId::new(hits[0].entity_id),
1498 )
1499 .expect("hit entity exists");
1500 assert_eq!(row_text(&entity, "tenant_id"), Some("acme"));
1501 }
1502
1503 #[test]
1504 fn execute_pipeline_retrieves_known_good_bm25_source_order() {
1505 let rt = fresh_runtime();
1506 rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1507 .expect("create docs");
1508 rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1509 .expect("insert specific doc");
1510 rt.execute_query(
1511 "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1512 )
1513 .expect("insert broad doc");
1514 rt.schema_vocabulary_apply(
1515 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1516 collection: "docs".to_string(),
1517 columns: vec!["body".into()],
1518 type_tags: Vec::new(),
1519 description: None,
1520 },
1521 );
1522
1523 let scope = make_scope(["docs".to_string()].into_iter().collect());
1524 let ctx = AskPipeline::execute_with_limit_and_min_score(
1525 &rt,
1526 &scope,
1527 "body passport renewal",
1528 2,
1529 None,
1530 Some(1),
1531 )
1532 .expect("pipeline executes");
1533
1534 assert_eq!(ctx.text_hits.len(), 2);
1535 assert!(
1536 ctx.text_hits[0].score > ctx.text_hits[1].score,
1537 "BM25 source order should prefer the shorter exact match: {:?}",
1538 ctx.text_hits
1539 );
1540 assert!(matches!(
1541 fused_source_order(&ctx).first(),
1542 Some(FusedSourceRef::TextHit(0))
1543 ));
1544 }
1545
1546 #[test]
1547 fn graph_search_scoped_honors_depth() {
1548 let rt = fresh_runtime();
1549 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('alice', 'Alice')")
1550 .expect("insert alice");
1551 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('bob', 'Bob')")
1552 .expect("insert bob");
1553 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('carol', 'Carol')")
1554 .expect("insert carol");
1555 rt.execute_query(
1556 "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'alice', 'bob')",
1557 )
1558 .expect("insert alice-bob edge");
1559 rt.execute_query(
1560 "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'bob', 'carol')",
1561 )
1562 .expect("insert bob-carol edge");
1563
1564 let scope = make_scope(["tales".to_string()].into_iter().collect());
1565 let candidates = CandidateCollections {
1566 collections: vec!["tales".to_string()],
1567 columns_by_collection: HashMap::new(),
1568 };
1569 let depth1 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(1));
1570 let depth2 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(2));
1571
1572 assert!(
1573 depth1.iter().all(|hit| hit.depth <= 1),
1574 "DEPTH 1 returned hits beyond one hop: {depth1:?}"
1575 );
1576 assert!(
1577 depth2.iter().any(|hit| hit.depth == 2),
1578 "DEPTH 2 should include the second-hop graph hit: {depth2:?}"
1579 );
1580 }
1581
1582 #[test]
1583 fn filter_values_filters_rls_denied_rows() {
1584 let rt = fresh_runtime();
1585 rt.execute_query("CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT)")
1586 .expect("create docs");
1587 rt.execute_query(
1588 "INSERT INTO docs (id, tenant_id, body) VALUES \
1589 (1, 'acme', 'incident FDD-12313'), \
1590 (2, 'globex', 'incident FDD-12313')",
1591 )
1592 .expect("seed docs");
1593 rt.execute_query(
1594 "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1595 )
1596 .expect("create policy");
1597 rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1598 .expect("enable rls");
1599
1600 let _tenant = TenantGuard::set("acme");
1601 let scope = make_scope(["docs".to_string()].into_iter().collect());
1602 let candidates = CandidateCollections {
1603 collections: vec!["docs".to_string()],
1604 columns_by_collection: HashMap::from([("docs".to_string(), vec!["body".to_string()])]),
1605 };
1606 let tokens = TokenSet {
1607 keywords: vec!["incident".to_string()],
1608 literals: vec!["FDD-12313".to_string()],
1609 };
1610
1611 let rows = filter_values(&rt, &scope, &candidates, &tokens, 10);
1612
1613 assert_eq!(rows.len(), 1, "RLS should hide the globex row: {rows:?}");
1614 assert_eq!(row_text(&rows[0].entity, "tenant_id"), Some("acme"));
1615 }
1616
1617 #[test]
1620 fn execute_refuses_empty_token_set() {
1621 let rt = fresh_runtime();
1622 let scope = make_scope(HashSet::new());
1623 let err = AskPipeline::execute(&rt, &scope, "??? ...")
1624 .expect_err("empty token set must short-circuit");
1625 let msg = format!("{err}");
1626 assert!(
1627 msg.contains("yielded no usable tokens"),
1628 "expected structured empty-token error, got: {msg}"
1629 );
1630 }
1631
1632 #[test]
1637 fn match_schema_intersects_with_visible_set() {
1638 let rt = fresh_runtime();
1639 rt.schema_vocabulary_apply(
1643 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1644 collection: "travel".to_string(),
1645 columns: vec!["id".into(), "passport".into()],
1646 type_tags: Vec::new(),
1647 description: None,
1648 },
1649 );
1650 rt.schema_vocabulary_apply(
1651 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1652 collection: "secrets".to_string(),
1653 columns: vec!["passport".into()],
1654 type_tags: Vec::new(),
1655 description: None,
1656 },
1657 );
1658 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1659 let scope = make_scope(visible.clone());
1660 let tokens = TokenSet {
1661 keywords: vec!["passport".to_string()],
1662 literals: Vec::new(),
1663 };
1664 let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
1665 assert_eq!(candidates.collections, vec!["travel".to_string()]);
1666 assert!(!candidates.collections.contains(&"secrets".to_string()));
1667 let cols = candidates
1669 .columns_by_collection
1670 .get("travel")
1671 .expect("hint columns");
1672 assert!(cols.contains(&"passport".to_string()));
1673 }
1674
1675 use proptest::prelude::*;
1683
1684 fn arb_collection() -> impl Strategy<Value = String> {
1685 "[a-z]{1,4}"
1686 }
1687
1688 fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
1689 prop::collection::hash_set(arb_collection(), 0..6)
1690 }
1691
1692 fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
1693 prop::collection::vec(arb_collection(), 0..8)
1694 }
1695
1696 proptest! {
1697 #![proptest_config(ProptestConfig::with_cases(256))]
1698 #[test]
1699 fn stage4_rows_subset_of_visible_collections(
1700 visible in arb_visible(),
1701 candidate_names in arb_candidates(),
1702 literal_count in 0usize..3,
1703 ) {
1704 let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
1710 let candidates = CandidateCollections {
1711 collections: candidate_names,
1712 columns_by_collection: HashMap::new(),
1713 };
1714 let literals: Vec<String> = (0..literal_count)
1715 .map(|i| format!("ID-{i}"))
1716 .collect();
1717 let tokens = TokenSet {
1718 keywords: vec!["passport".to_string()],
1719 literals,
1720 };
1721 let scope = make_scope(visible.clone());
1722 let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
1723 for row in &rows {
1724 prop_assert!(
1725 visible.contains(&row.collection),
1726 "Stage 4 leaked row collection={} not in visible={:?}",
1727 row.collection, visible
1728 );
1729 }
1730 }
1731 }
1732
1733 static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
1734
1735 #[test]
1748 fn integration_passport_fdd_12313_funnels_through_four_stages() {
1749 let rt = fresh_runtime();
1750 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1754 .expect("CREATE TABLE travel");
1755 rt.execute_query(
1756 "INSERT INTO travel (id, passport, notes) VALUES \
1757 (1, 'BR-001', 'unrelated note'), \
1758 (2, 'PT-002', 'incident FDD-12313 escalated'), \
1759 (3, 'US-003', 'standard renewal')",
1760 )
1761 .expect("seed rows");
1762 rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
1764 .expect("CREATE TABLE secrets");
1765 rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
1766 .expect("seed secrets");
1767
1768 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1769 let scope = make_scope(visible);
1770
1771 let ctx = AskPipeline::execute(
1772 &rt,
1773 &scope,
1774 "quais as novidades sobre o passport FDD-12313?",
1775 )
1776 .expect("pipeline runs");
1777
1778 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1780 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1781
1782 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1786
1787 let _ = &ctx.vector_hits;
1791
1792 assert!(
1795 ctx.filtered_rows
1796 .iter()
1797 .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1798 "expected travel row with FDD-12313 match, got: {:?}",
1799 ctx.filtered_rows
1800 );
1801 for row in &ctx.filtered_rows {
1802 assert_ne!(
1803 row.collection, "secrets",
1804 "secrets row leaked into Stage 4 output"
1805 );
1806 }
1807
1808 let _ = ctx.timings.extract_us
1812 + ctx.timings.schema_us
1813 + ctx.timings.vector_us
1814 + ctx.timings.filter_us;
1815 }
1816
1817 fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1831 let store = rt.inner.db.store();
1832 store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1833 }
1834
1835 #[test]
1838 fn routed_default_backend_runs_heuristic() {
1839 let rt = fresh_runtime();
1840 let scope = make_scope(HashSet::new());
1841 let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1842 .expect("heuristic path is infallible");
1843 assert!(tokens.keywords.contains(&"passport".to_string()));
1844 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1845 }
1846
1847 #[tokio::test(flavor = "multi_thread")]
1850 async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1851 let rt = fresh_runtime();
1852 write_config(&rt, "ai.ner.backend", "llm");
1853 write_config(&rt, "ai.ner.fallback", "use_heuristic");
1854 let scope = make_scope(HashSet::new());
1855 let tokens = tokio::task::spawn_blocking(move || {
1856 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1857 })
1858 .await
1859 .unwrap()
1860 .expect("fallback policy keeps the call OK");
1861 assert!(tokens.keywords.contains(&"passport".to_string()));
1862 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1863 }
1864
1865 #[tokio::test(flavor = "multi_thread")]
1868 async fn routed_llm_auth_denied_empty_on_fail() {
1869 let rt = fresh_runtime();
1870 write_config(&rt, "ai.ner.backend", "llm");
1871 write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1872 let scope = make_scope(HashSet::new());
1873 let tokens = tokio::task::spawn_blocking(move || {
1874 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1875 })
1876 .await
1877 .unwrap()
1878 .expect("empty_on_fail returns Ok with empty TokenSet");
1879 assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1880 }
1881
1882 #[tokio::test(flavor = "multi_thread")]
1886 async fn routed_llm_auth_denied_propagate_returns_error() {
1887 let rt = fresh_runtime();
1888 write_config(&rt, "ai.ner.backend", "llm");
1889 write_config(&rt, "ai.ner.fallback", "propagate");
1890 let scope = make_scope(HashSet::new());
1891 let err = tokio::task::spawn_blocking(move || {
1892 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1893 })
1894 .await
1895 .unwrap()
1896 .expect_err("propagate must surface the error");
1897 let msg = format!("{err}");
1898 assert!(
1899 msg.contains("propagate") || msg.contains("ai.ner.backend"),
1900 "expected propagate error message, got: {msg}"
1901 );
1902 }
1903
1904 #[tokio::test(flavor = "multi_thread")]
1909 async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1910 let rt = fresh_runtime();
1911 write_config(&rt, "ai.ner.backend", "llm");
1912 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1914 .expect("CREATE TABLE travel");
1915 rt.execute_query(
1916 "INSERT INTO travel (id, passport, notes) VALUES \
1917 (2, 'PT-002', 'incident FDD-12313 escalated')",
1918 )
1919 .expect("seed rows");
1920 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1921 let scope = make_scope(visible);
1922 let ctx = tokio::task::spawn_blocking(move || {
1923 AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1924 })
1925 .await
1926 .unwrap()
1927 .expect("pipeline runs");
1928 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1929 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1930 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1931 assert!(
1932 ctx.filtered_rows
1933 .iter()
1934 .any(|r| r.matched_literal == "FDD-12313"),
1935 "Stage 4 still runs after Stage 1 fallback"
1936 );
1937 }
1938}