1use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40 AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::application::SearchContextInput;
46use crate::storage::schema::Value;
47use crate::storage::unified::entity::{EntityData, EntityKind, UnifiedEntity};
48
49pub const DEFAULT_ROW_CAP: usize = 20;
52
53#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TokenSet {
56 pub keywords: Vec<String>,
59 pub literals: Vec<String>,
63}
64
65impl TokenSet {
66 pub fn is_empty(&self) -> bool {
67 self.keywords.is_empty() && self.literals.is_empty()
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct CandidateCollections {
74 pub collections: Vec<String>,
77 pub columns_by_collection: HashMap<String, Vec<String>>,
81}
82
83#[derive(Debug, Clone)]
85pub struct TextHit {
86 pub collection: String,
87 pub entity_id: u64,
88 pub score: f32,
89}
90
91#[derive(Debug, Clone)]
94pub struct VectorHit {
95 pub collection: String,
96 pub entity_id: u64,
97 pub score: f32,
98}
99
100#[derive(Debug, Clone)]
102pub struct GraphHit {
103 pub collection: String,
104 pub entity_id: u64,
105 pub score: f32,
106 pub depth: usize,
107 pub kind: GraphHitKind,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum GraphHitKind {
112 Node,
113 Edge,
114}
115
116#[derive(Debug, Clone)]
118pub struct FilteredRow {
119 pub collection: String,
120 pub entity: UnifiedEntity,
121 pub matched_literal: String,
123 pub matched_column: Option<String>,
125}
126
127#[derive(Debug, Clone, Default, PartialEq, Eq)]
129pub struct StageTimings {
130 pub extract_us: u64,
131 pub schema_us: u64,
132 pub text_us: u64,
133 pub vector_us: u64,
134 pub graph_us: u64,
135 pub filter_us: u64,
136}
137
138#[derive(Debug, Clone)]
142pub struct AskContext {
143 pub question: String,
144 pub tokens: TokenSet,
145 pub candidates: CandidateCollections,
146 pub text_hits: Vec<TextHit>,
147 pub vector_hits: Vec<VectorHit>,
148 pub graph_hits: Vec<GraphHit>,
149 pub filtered_rows: Vec<FilteredRow>,
150 pub source_limit: usize,
151 pub timings: StageTimings,
152}
153
154impl Default for AskContext {
155 fn default() -> Self {
156 Self {
157 question: String::new(),
158 tokens: TokenSet::default(),
159 candidates: CandidateCollections::default(),
160 text_hits: Vec::new(),
161 vector_hits: Vec::new(),
162 graph_hits: Vec::new(),
163 filtered_rows: Vec::new(),
164 source_limit: DEFAULT_ROW_CAP,
165 timings: StageTimings::default(),
166 }
167 }
168}
169
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum FusedSourceRef {
173 FilteredRow(usize),
174 TextHit(usize),
175 VectorHit(usize),
176 GraphHit(usize),
177}
178
179#[derive(Debug, Clone, Copy, PartialEq)]
181pub struct FusedSource {
182 pub source: FusedSourceRef,
183 pub rrf_score: f64,
184}
185
186pub enum AskPipeline {}
190
191impl AskPipeline {
192 pub fn execute(
194 runtime: &RedDBRuntime,
195 scope: &EffectiveScope,
196 question: &str,
197 ) -> RedDBResult<AskContext> {
198 Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
199 }
200
201 pub fn execute_with_limit(
203 runtime: &RedDBRuntime,
204 scope: &EffectiveScope,
205 question: &str,
206 row_cap: usize,
207 ) -> RedDBResult<AskContext> {
208 Self::execute_with_limit_and_min_score(runtime, scope, question, row_cap, None, None)
209 }
210
211 pub fn execute_with_limit_and_min_score(
214 runtime: &RedDBRuntime,
215 scope: &EffectiveScope,
216 question: &str,
217 row_cap: usize,
218 min_score: Option<f32>,
219 graph_depth: Option<usize>,
220 ) -> RedDBResult<AskContext> {
221 let span = info_span!(
222 "ask_pipeline.execute",
223 tenant = ?scope.effective_scope(),
224 question_len = question.len(),
225 row_cap = row_cap,
226 min_score = ?min_score,
227 graph_depth = ?graph_depth,
228 );
229 let _enter = span.enter();
230
231 let stage1 = Instant::now();
236 let tokens = extract_tokens_routed(runtime, scope, question)?;
237 let extract_us = stage1.elapsed().as_micros() as u64;
238 debug!(
239 target: "ask_pipeline",
240 stage = "extract_tokens",
241 keywords = ?tokens.keywords,
242 literals = ?tokens.literals,
243 elapsed_us = extract_us,
244 "stage 1 done"
245 );
246 if tokens.is_empty() {
247 warn!(
248 target: "ask_pipeline",
249 question_len = question.len(),
250 "refused: empty token set"
251 );
252 return Err(RedDBError::Query(
253 "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
254 .to_string(),
255 ));
256 }
257
258 let stage2 = Instant::now();
260 let candidates = match_schema(runtime, scope, &tokens)?;
261 let schema_us = stage2.elapsed().as_micros() as u64;
262 debug!(
263 target: "ask_pipeline",
264 stage = "match_schema",
265 collections = ?candidates.collections,
266 elapsed_us = schema_us,
267 "stage 2 done"
268 );
269
270 let stage3 = Instant::now();
272 let text_hits = text_search_bm25_scoped(runtime, scope, question, &candidates, row_cap);
273 let text_us = stage3.elapsed().as_micros() as u64;
274 debug!(
275 target: "ask_pipeline",
276 stage = "text_search_bm25_scoped",
277 hits = text_hits.len(),
278 elapsed_us = text_us,
279 "stage 3 done"
280 );
281
282 let stage3b = Instant::now();
284 let vector_hits =
285 vector_search_scoped(runtime, scope, question, &candidates, row_cap, min_score);
286 let vector_us = stage3b.elapsed().as_micros() as u64;
287 debug!(
288 target: "ask_pipeline",
289 stage = "vector_search_scoped",
290 hits = vector_hits.len(),
291 elapsed_us = vector_us,
292 "stage 3b done"
293 );
294
295 let stage3c = Instant::now();
297 let graph_hits = graph_search_scoped(
298 runtime,
299 scope,
300 question,
301 &candidates,
302 row_cap,
303 min_score,
304 graph_depth,
305 );
306 let graph_us = stage3c.elapsed().as_micros() as u64;
307 debug!(
308 target: "ask_pipeline",
309 stage = "graph_search_scoped",
310 hits = graph_hits.len(),
311 elapsed_us = graph_us,
312 "stage 3c done"
313 );
314
315 let stage4 = Instant::now();
317 let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
318 let filter_us = stage4.elapsed().as_micros() as u64;
319 debug!(
320 target: "ask_pipeline",
321 stage = "filter_values",
322 rows = filtered_rows.len(),
323 elapsed_us = filter_us,
324 "stage 4 done"
325 );
326
327 Ok(AskContext {
328 question: question.to_string(),
329 tokens,
330 candidates,
331 text_hits,
332 vector_hits,
333 graph_hits,
334 filtered_rows,
335 source_limit: row_cap,
336 timings: StageTimings {
337 extract_us,
338 schema_us,
339 text_us,
340 vector_us,
341 graph_us,
342 filter_us,
343 },
344 })
345 }
346}
347
348pub fn fused_source_order(ctx: &AskContext) -> Vec<FusedSourceRef> {
351 fused_sources(ctx)
352 .into_iter()
353 .map(|fused| fused.source)
354 .collect()
355}
356
357pub fn fused_sources(ctx: &AskContext) -> Vec<FusedSource> {
360 use super::ai::rrf_fuser::{fuse, Bucket, Candidate, RRF_K_DEFAULT};
361
362 if ctx.source_limit == 0
363 || (ctx.filtered_rows.is_empty()
364 && ctx.text_hits.is_empty()
365 && ctx.vector_hits.is_empty()
366 && ctx.graph_hits.is_empty())
367 {
368 return Vec::new();
369 }
370
371 let mut refs: HashMap<String, FusedSourceRef> = HashMap::new();
372 let row_bucket = Bucket {
373 candidates: ctx
374 .filtered_rows
375 .iter()
376 .enumerate()
377 .map(|(idx, row)| {
378 let id = source_identity(&row.collection, row.entity.id.raw());
379 refs.entry(id.clone())
380 .or_insert(FusedSourceRef::FilteredRow(idx));
381 Candidate { id, score: 1.0 }
382 })
383 .collect(),
384 min_score: None,
385 };
386 let text_bucket = Bucket {
387 candidates: ctx
388 .text_hits
389 .iter()
390 .enumerate()
391 .map(|(idx, hit)| {
392 let id = source_identity(&hit.collection, hit.entity_id);
393 refs.entry(id.clone())
394 .or_insert(FusedSourceRef::TextHit(idx));
395 Candidate {
396 id,
397 score: hit.score as f64,
398 }
399 })
400 .collect(),
401 min_score: None,
402 };
403 let vector_bucket = Bucket {
404 candidates: ctx
405 .vector_hits
406 .iter()
407 .enumerate()
408 .map(|(idx, hit)| {
409 let id = source_identity(&hit.collection, hit.entity_id);
410 refs.entry(id.clone())
411 .or_insert(FusedSourceRef::VectorHit(idx));
412 Candidate {
413 id,
414 score: hit.score as f64,
415 }
416 })
417 .collect(),
418 min_score: None,
419 };
420 let graph_bucket = Bucket {
421 candidates: ctx
422 .graph_hits
423 .iter()
424 .enumerate()
425 .map(|(idx, hit)| {
426 let id = source_identity(&hit.collection, hit.entity_id);
427 refs.entry(id.clone())
428 .or_insert(FusedSourceRef::GraphHit(idx));
429 Candidate {
430 id,
431 score: hit.score as f64,
432 }
433 })
434 .collect(),
435 min_score: None,
436 };
437
438 fuse(
439 &[row_bucket, text_bucket, vector_bucket, graph_bucket],
440 RRF_K_DEFAULT,
441 ctx.source_limit,
442 )
443 .into_iter()
444 .filter_map(|item| {
445 refs.get(&item.id).copied().map(|source| FusedSource {
446 source,
447 rrf_score: item.rrf_score,
448 })
449 })
450 .collect()
451}
452
453fn source_identity(collection: &str, entity_id: u64) -> String {
454 format!("{collection}/{entity_id}")
455}
456
457pub fn text_search_bm25_scoped(
465 runtime: &RedDBRuntime,
466 scope: &EffectiveScope,
467 question: &str,
468 candidates: &CandidateCollections,
469 top_k: usize,
470) -> Vec<TextHit> {
471 if candidates.collections.is_empty() || top_k == 0 {
472 return Vec::new();
473 }
474
475 let visible = scope.visible_collections();
476 let allowed: BTreeSet<String> = candidates
477 .collections
478 .iter()
479 .filter(|collection| visible.is_none_or(|set| set.contains(*collection)))
480 .cloned()
481 .collect();
482 if allowed.is_empty() {
483 return Vec::new();
484 }
485
486 let _scope_guard = AskScopeGuard::install(scope);
487 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
488 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
489 let store = runtime.inner.db.store();
490
491 runtime
492 .inner
493 .db
494 .store()
495 .context_index()
496 .search_bm25(question, top_k, Some(&allowed))
497 .into_iter()
498 .filter_map(|hit| {
499 let entity = store.get(&hit.collection, hit.entity_id)?;
500 if !ask_entity_allowed(
501 runtime,
502 scope,
503 &hit.collection,
504 &entity,
505 snap_ctx.as_ref(),
506 &mut rls_cache,
507 ) {
508 return None;
509 }
510 Some(TextHit {
511 collection: hit.collection,
512 entity_id: hit.entity_id.raw(),
513 score: hit.score,
514 })
515 })
516 .collect()
517}
518
519fn extract_tokens_routed(
544 runtime: &RedDBRuntime,
545 scope: &EffectiveScope,
546 question: &str,
547) -> RedDBResult<TokenSet> {
548 let backend = runtime.config_string("ai.ner.backend", "heuristic");
549 if backend != "llm" {
550 return Ok(extract_tokens(question));
551 }
552
553 let endpoint = runtime.config_string("ai.ner.endpoint", "");
554 let model = runtime.config_string("ai.ner.model", "");
555 let timeout_ms = runtime
556 .config_string("ai.ner.timeout_ms", "5000")
557 .parse::<u32>()
558 .unwrap_or(5000);
559 let fallback = match runtime
560 .config_string("ai.ner.fallback", "use_heuristic")
561 .as_str()
562 {
563 "empty_on_fail" => HeuristicFallback::EmptyOnFail,
564 "propagate" => HeuristicFallback::Propagate,
565 _ => HeuristicFallback::UseHeuristic,
566 };
567
568 let provider = if endpoint.is_empty() && model.is_empty() {
571 NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
575 } else {
576 NerProvider::OpenAiCompat { endpoint, model }
577 };
578
579 let mut ner = LlmNer::new(provider, fallback);
580 ner.timeout_ms = timeout_ms;
581
582 let auth = ScopeAuthAdapter(scope);
583 let llm_result = match tokio::runtime::Handle::try_current() {
584 Ok(handle) => {
585 tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
590 }
591 Err(_) => {
592 warn!(
593 target: "ask_pipeline",
594 "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
595 );
596 return Ok(extract_tokens(question));
597 }
598 };
599
600 match llm_result {
601 Ok(tokens) => Ok(tokens),
602 Err(NerError::AuthDenied) => {
603 log_auth_denial_once();
604 apply_fallback(fallback, question)
609 }
610 Err(err) => {
611 warn!(
612 target: "ask_pipeline",
613 error = %err,
614 "LlmNer extract failed; honouring HeuristicFallback policy"
615 );
616 apply_fallback(fallback, question)
617 }
618 }
619}
620
621fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
622 match fallback {
623 HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
624 HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
625 HeuristicFallback::Propagate => Err(RedDBError::Query(
626 "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
627 )),
628 }
629}
630
631fn log_auth_denial_once() {
635 static EMITTED: AtomicBool = AtomicBool::new(false);
636 if !EMITTED.swap(true, Ordering::Relaxed) {
637 info!(
638 target: "ask_pipeline",
639 capability = NER_CAPABILITY,
640 "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
641 NER_CAPABILITY
642 );
643 }
644}
645
646struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
650
651impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
654 }
655}
656
657impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
658 fn has_capability(&self, capability: &str) -> bool {
659 self.0.has_capability(capability)
660 }
661}
662
663pub fn extract_tokens(question: &str) -> TokenSet {
674 let mut keywords: Vec<String> = Vec::new();
675 let mut literals: Vec<String> = Vec::new();
676
677 let mut chars = question.chars().peekable();
678 let mut buf = String::new();
679
680 let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
681 if buf.is_empty() {
682 return;
683 }
684 let word = std::mem::take(buf);
685 classify_token(&word, keywords, literals);
686 };
687
688 while let Some(ch) = chars.next() {
689 if ch.is_alphanumeric() || ch == '_' || ch == '-' {
690 buf.push(ch);
691 } else {
692 flush(&mut buf, &mut keywords, &mut literals);
693 let _ = ch;
695 }
696 if chars.peek().is_none() {
698 flush(&mut buf, &mut keywords, &mut literals);
699 }
700 }
701 if !buf.is_empty() {
704 classify_token(&buf, &mut keywords, &mut literals);
705 }
706
707 let mut seen = HashSet::new();
709 keywords.retain(|tok| seen.insert(tok.clone()));
710 let mut seen_lit = HashSet::new();
711 literals.retain(|tok| seen_lit.insert(tok.clone()));
712
713 TokenSet { keywords, literals }
714}
715
716fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
717 let is_upper_id_shape = word.len() >= 3
720 && word
721 .chars()
722 .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
723 && word.chars().any(|c| c.is_ascii_digit())
724 && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
725 let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
726 if is_upper_id_shape || is_long_digit_run {
727 literals.push(word.to_string());
728 return;
729 }
730 let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
733 if trimmed.len() < 2 {
734 return;
735 }
736 if !trimmed
737 .chars()
738 .next()
739 .map(|c| c.is_ascii_alphabetic())
740 .unwrap_or(false)
741 {
742 return;
743 }
744 if !trimmed
745 .chars()
746 .all(|c| c.is_ascii_alphanumeric() || c == '_')
747 {
748 return;
751 }
752 let lower = trimmed.to_ascii_lowercase();
753 if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
754 return;
755 }
756 keywords.push(lower);
757}
758
759const STOP_WORDS: &[&str] = &[
762 "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
763 "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
764 "where", "which", "with",
765];
766
767pub fn match_schema(
776 runtime: &RedDBRuntime,
777 scope: &EffectiveScope,
778 tokens: &TokenSet,
779) -> RedDBResult<CandidateCollections> {
780 let visible = match scope.visible_collections() {
781 Some(set) => set.clone(),
782 None => {
783 runtime
788 .inner
789 .db
790 .store()
791 .list_collections()
792 .into_iter()
793 .collect()
794 }
795 };
796
797 let mut collections: BTreeSet<String> = BTreeSet::new();
798 let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
799 for keyword in &tokens.keywords {
800 let hits = runtime.schema_vocabulary_lookup(keyword);
801 for hit in hits {
802 if !visible.contains(&hit.collection) {
803 continue;
804 }
805 collections.insert(hit.collection.clone());
806 if let Some(column) = hit.column {
807 columns_by_collection
808 .entry(hit.collection)
809 .or_default()
810 .insert(column);
811 }
812 }
813 }
814
815 Ok(CandidateCollections {
816 collections: collections.into_iter().collect(),
817 columns_by_collection: columns_by_collection
818 .into_iter()
819 .map(|(k, v)| (k, v.into_iter().collect()))
820 .collect(),
821 })
822}
823
824pub fn vector_search_scoped(
833 runtime: &RedDBRuntime,
834 scope: &EffectiveScope,
835 question: &str,
836 candidates: &CandidateCollections,
837 top_k: usize,
838 min_score: Option<f32>,
839) -> Vec<VectorHit> {
840 if candidates.collections.is_empty() {
841 return Vec::new();
842 }
843 let Some(embedding) = embed_question(runtime, question) else {
844 return Vec::new();
845 };
846 let per_collection = top_k.max(1);
847 let mut hits: Vec<VectorHit> = Vec::new();
848 let _scope_guard = AskScopeGuard::install(scope);
849 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
850 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
851 let store = runtime.inner.db.store();
852 for collection in &candidates.collections {
853 match super::authorized_search::AuthorizedSearch::execute_similar(
854 runtime,
855 scope,
856 collection,
857 &embedding,
858 per_collection,
859 min_score.unwrap_or(0.0),
860 ) {
861 Ok(results) => {
862 for result in results {
863 let Some(entity) = store.get(collection, result.entity_id) else {
864 continue;
865 };
866 if !ask_entity_allowed(
867 runtime,
868 scope,
869 collection,
870 &entity,
871 snap_ctx.as_ref(),
872 &mut rls_cache,
873 ) {
874 continue;
875 }
876 hits.push(VectorHit {
877 collection: collection.clone(),
878 entity_id: result.entity_id.raw(),
879 score: result.score,
880 });
881 }
882 }
883 Err(err) => {
884 debug!(
885 target: "ask_pipeline",
886 collection = collection,
887 err = %err,
888 "vector_search_scoped: collection skipped"
889 );
890 }
891 }
892 }
893 hits.sort_by(|a, b| {
894 b.score
895 .partial_cmp(&a.score)
896 .unwrap_or(std::cmp::Ordering::Equal)
897 .then_with(|| a.entity_id.cmp(&b.entity_id))
898 });
899 hits.truncate(top_k);
900 hits
901}
902
903pub fn graph_search_scoped(
911 runtime: &RedDBRuntime,
912 scope: &EffectiveScope,
913 question: &str,
914 candidates: &CandidateCollections,
915 top_k: usize,
916 min_score: Option<f32>,
917 graph_depth: Option<usize>,
918) -> Vec<GraphHit> {
919 if candidates.collections.is_empty() || top_k == 0 {
920 return Vec::new();
921 }
922 let depth = graph_depth
923 .unwrap_or(crate::runtime::ai::mcp_ask_tool::DEPTH_DEFAULT as usize)
924 .max(1);
925 let _scope_guard = AskScopeGuard::install(scope);
926 let input = SearchContextInput {
927 query: question.to_string(),
928 field: None,
929 vector: None,
930 collections: Some(candidates.collections.clone()),
931 graph_depth: Some(depth),
932 graph_max_edges: None,
933 max_cross_refs: Some(0),
934 follow_cross_refs: Some(false),
935 expand_graph: Some(true),
936 global_scan: Some(true),
937 reindex: Some(false),
938 limit: Some(top_k),
939 min_score,
940 };
941 let result = match if scope.visible_collections().is_some() {
942 super::authorized_search::AuthorizedSearch::execute_context(runtime, scope, input)
943 } else {
944 runtime.search_context(input)
945 } {
946 Ok(result) => result,
947 Err(err) => {
948 debug!(
949 target: "ask_pipeline",
950 err = %err,
951 "graph_search_scoped: context search skipped"
952 );
953 return Vec::new();
954 }
955 };
956
957 let mut hits = Vec::new();
958 for entity in result.graph.nodes.into_iter().chain(result.graph.edges) {
959 let crate::runtime::DiscoveryMethod::GraphTraversal { depth, .. } = entity.discovery else {
960 continue;
961 };
962 let kind = match entity.entity.kind {
963 EntityKind::GraphNode(_) => GraphHitKind::Node,
964 EntityKind::GraphEdge(_) => GraphHitKind::Edge,
965 _ => continue,
966 };
967 hits.push(GraphHit {
968 collection: entity.collection,
969 entity_id: entity.entity.id.raw(),
970 score: entity.score,
971 depth,
972 kind,
973 });
974 }
975 hits.sort_by(|a, b| {
976 b.score
977 .partial_cmp(&a.score)
978 .unwrap_or(std::cmp::Ordering::Equal)
979 .then_with(|| a.depth.cmp(&b.depth))
980 .then_with(|| a.collection.cmp(&b.collection))
981 .then_with(|| a.entity_id.cmp(&b.entity_id))
982 });
983 hits.truncate(top_k);
984 hits
985}
986
987fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
991 let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
992 match runtime.inner.db.get_kv("red_config", key) {
993 Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
994 Some(_) => Ok(None),
995 None => Ok(None),
996 }
997 };
998 let provider = crate::ai::resolve_default_provider(&kv_getter);
999 if !provider.is_openai_compatible() {
1000 return None;
1001 }
1002 let model = crate::ai::resolve_default_model(&provider, &kv_getter);
1003 let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
1004 let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
1005 let request = crate::ai::OpenAiEmbeddingRequest {
1006 api_key,
1007 model,
1008 inputs: vec![question.to_string()],
1009 dimensions: None,
1010 api_base: provider.resolve_api_base(),
1011 };
1012 let response = crate::runtime::ai::block_on_ai(async move {
1013 crate::ai::openai_embeddings_async(&transport, request).await
1014 })
1015 .and_then(|result| result)
1016 .ok()?;
1017 response.embeddings.into_iter().next()
1018}
1019
1020pub fn filter_values(
1029 runtime: &RedDBRuntime,
1030 scope: &EffectiveScope,
1031 candidates: &CandidateCollections,
1032 tokens: &TokenSet,
1033 row_cap: usize,
1034) -> Vec<FilteredRow> {
1035 if tokens.literals.is_empty() || candidates.collections.is_empty() {
1036 return Vec::new();
1037 }
1038 let visible = scope.visible_collections();
1039 let store = runtime.inner.db.store();
1040 let mut out: Vec<FilteredRow> = Vec::new();
1041 let _scope_guard = AskScopeGuard::install(scope);
1042 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1043 let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
1044
1045 'collection: for collection in &candidates.collections {
1046 if let Some(set) = visible {
1050 if !set.contains(collection) {
1051 continue;
1052 }
1053 }
1054 let Some(manager) = store.get_collection(collection) else {
1055 continue;
1056 };
1057 let hint_columns: &[String] = candidates
1058 .columns_by_collection
1059 .get(collection)
1060 .map(|v| v.as_slice())
1061 .unwrap_or(&[]);
1062
1063 for entity in manager.query_all(|_| true) {
1064 if !ask_entity_allowed(
1065 runtime,
1066 scope,
1067 collection,
1068 &entity,
1069 snap_ctx.as_ref(),
1070 &mut rls_cache,
1071 ) {
1072 continue;
1073 }
1074 if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
1075 out.push(FilteredRow {
1076 collection: collection.clone(),
1077 entity,
1078 matched_literal: hit.0,
1079 matched_column: hit.1,
1080 });
1081 if out.len() >= row_cap {
1082 break 'collection;
1083 }
1084 }
1085 }
1086 }
1087 out
1088}
1089
1090fn ask_entity_allowed(
1091 runtime: &RedDBRuntime,
1092 scope: &EffectiveScope,
1093 collection: &str,
1094 entity: &UnifiedEntity,
1095 snap_ctx: Option<&crate::runtime::impl_core::SnapshotContext>,
1096 rls_cache: &mut HashMap<String, Option<crate::storage::query::ast::Filter>>,
1097) -> bool {
1098 if scope
1099 .visible_collections()
1100 .is_some_and(|visible| !visible.contains(collection))
1101 {
1102 return false;
1103 }
1104 runtime.search_entity_allowed(collection, entity, snap_ctx, rls_cache)
1105}
1106
1107struct AskScopeGuard {
1108 prev_tenant: Option<String>,
1109 prev_auth: Option<(String, crate::auth::Role)>,
1110}
1111
1112impl AskScopeGuard {
1113 fn install(scope: &EffectiveScope) -> Self {
1114 let prev_tenant = crate::runtime::impl_core::current_tenant();
1115 let prev_auth = crate::runtime::impl_core::current_auth_identity();
1116
1117 match scope.effective_scope() {
1118 Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant.to_string()),
1119 None => crate::runtime::impl_core::clear_current_tenant(),
1120 }
1121 match scope.identity() {
1122 Some((user, role)) => {
1123 crate::runtime::impl_core::set_current_auth_identity(user.to_string(), role)
1124 }
1125 None => crate::runtime::impl_core::clear_current_auth_identity(),
1126 }
1127
1128 Self {
1129 prev_tenant,
1130 prev_auth,
1131 }
1132 }
1133}
1134
1135impl Drop for AskScopeGuard {
1136 fn drop(&mut self) {
1137 match self.prev_tenant.take() {
1138 Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant),
1139 None => crate::runtime::impl_core::clear_current_tenant(),
1140 }
1141 match self.prev_auth.take() {
1142 Some((user, role)) => crate::runtime::impl_core::set_current_auth_identity(user, role),
1143 None => crate::runtime::impl_core::clear_current_auth_identity(),
1144 }
1145 }
1146}
1147
1148fn literal_match_in_entity(
1151 entity: &UnifiedEntity,
1152 literals: &[String],
1153 hint_columns: &[String],
1154) -> Option<(String, Option<String>)> {
1155 let row = match &entity.data {
1156 EntityData::Row(row) => row,
1157 _ => return None,
1158 };
1159
1160 for column in hint_columns {
1162 if let Some(value) = row.get_field(column) {
1163 if let Some(lit) = first_literal_in_value(value, literals) {
1164 return Some((lit, Some(column.clone())));
1165 }
1166 }
1167 }
1168 for (name, value) in row.iter_fields() {
1170 if hint_columns.iter().any(|c| c == name) {
1171 continue;
1172 }
1173 if let Some(lit) = first_literal_in_value(value, literals) {
1174 return Some((lit, Some(name.to_string())));
1175 }
1176 }
1177 None
1178}
1179
1180fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
1181 let rendered = match value {
1182 Value::Text(s) => s.to_string(),
1183 Value::Integer(i) => i.to_string(),
1184 Value::Float(f) => f.to_string(),
1185 Value::Boolean(b) => b.to_string(),
1186 Value::Json(j) => String::from_utf8_lossy(j).to_string(),
1187 _ => return None,
1188 };
1189 for lit in literals {
1190 if rendered.contains(lit) {
1194 return Some(lit.clone());
1195 }
1196 }
1197 None
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202 use super::*;
1203
1204 #[test]
1207 fn extract_tokens_splits_keywords_and_literals() {
1208 let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
1209 assert!(tokens.keywords.contains(&"novidades".to_string()));
1212 assert!(tokens.keywords.contains(&"passport".to_string()));
1213 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1214 assert!(!tokens.is_empty());
1215 }
1216
1217 #[test]
1218 fn extract_tokens_returns_empty_for_punctuation_only() {
1219 let tokens = extract_tokens("??? ...");
1220 assert!(tokens.is_empty());
1221 }
1222
1223 #[test]
1224 fn extract_tokens_long_digit_run_is_a_literal() {
1225 let tokens = extract_tokens("show order 987654321 details");
1226 assert!(tokens.literals.contains(&"987654321".to_string()));
1227 assert!(tokens.keywords.contains(&"order".to_string()));
1228 assert!(tokens.keywords.contains(&"details".to_string()));
1229 assert!(tokens.keywords.contains(&"show".to_string()));
1230 }
1231
1232 #[test]
1233 fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
1234 let tokens = extract_tokens("USA exports report");
1237 assert!(tokens.keywords.contains(&"usa".to_string()));
1238 assert!(tokens.literals.is_empty());
1239 }
1240
1241 #[test]
1242 fn extract_tokens_dedups() {
1243 let tokens = extract_tokens("passport passport FDD-1 FDD-1");
1244 assert_eq!(
1245 tokens.keywords.iter().filter(|k| *k == "passport").count(),
1246 1
1247 );
1248 assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
1249 }
1250
1251 #[test]
1254 fn first_literal_in_value_substring_match() {
1255 let lit = first_literal_in_value(
1256 &Value::text("issue FDD-12313 reported by user"),
1257 &["FDD-12313".to_string()],
1258 );
1259 assert_eq!(lit.as_deref(), Some("FDD-12313"));
1260 }
1261
1262 #[test]
1263 fn first_literal_in_value_no_match_returns_none() {
1264 assert!(
1265 first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
1266 .is_none()
1267 );
1268 }
1269
1270 use crate::api::RedDBOptions;
1273 use crate::auth::Role;
1274 use crate::runtime::statement_frame::EffectiveScope;
1275 use crate::runtime::RedDBRuntime;
1276 use crate::storage::schema::Value;
1277 use crate::storage::transaction::snapshot::Snapshot;
1278 use crate::storage::unified::entity::{
1279 EntityData, EntityId, EntityKind, RowData, UnifiedEntity,
1280 };
1281 use std::sync::Arc;
1282
1283 fn make_scope(visible: HashSet<String>) -> EffectiveScope {
1284 EffectiveScope {
1285 tenant: Some("acme".to_string()),
1286 identity: Some(("alice".to_string(), Role::Read)),
1287 snapshot: Snapshot {
1288 xid: 0,
1289 in_progress: HashSet::new(),
1290 },
1291 visible_collections: Some(visible),
1292 }
1293 }
1294
1295 fn fresh_runtime() -> RedDBRuntime {
1296 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
1297 }
1298
1299 fn test_row(collection: &str, id: u64) -> FilteredRow {
1300 FilteredRow {
1301 collection: collection.to_string(),
1302 entity: UnifiedEntity::new(
1303 EntityId::new(id),
1304 EntityKind::TableRow {
1305 table: Arc::from(collection),
1306 row_id: id,
1307 },
1308 EntityData::Row(RowData {
1309 columns: Vec::new(),
1310 named: Some(
1311 [("body".to_string(), Value::text("ticket FDD-1".to_string()))]
1312 .into_iter()
1313 .collect(),
1314 ),
1315 schema: None,
1316 }),
1317 ),
1318 matched_literal: "FDD-1".to_string(),
1319 matched_column: Some("body".to_string()),
1320 }
1321 }
1322
1323 fn test_graph_hit(collection: &str, id: u64, score: f32, depth: usize) -> GraphHit {
1324 GraphHit {
1325 collection: collection.to_string(),
1326 entity_id: id,
1327 score,
1328 depth,
1329 kind: GraphHitKind::Node,
1330 }
1331 }
1332
1333 fn test_text_hit(collection: &str, id: u64, score: f32) -> TextHit {
1334 TextHit {
1335 collection: collection.to_string(),
1336 entity_id: id,
1337 score,
1338 }
1339 }
1340
1341 struct TenantGuard;
1342
1343 impl TenantGuard {
1344 fn set(tenant: &str) -> Self {
1345 crate::runtime::impl_core::set_current_tenant(tenant.to_string());
1346 Self
1347 }
1348 }
1349
1350 impl Drop for TenantGuard {
1351 fn drop(&mut self) {
1352 crate::runtime::impl_core::clear_current_tenant();
1353 }
1354 }
1355
1356 fn row_text<'a>(entity: &'a UnifiedEntity, field: &str) -> Option<&'a str> {
1357 let row = entity.data.as_row()?;
1358 match row.get_field(field)? {
1359 Value::Text(value) => Some(value.as_ref()),
1360 _ => None,
1361 }
1362 }
1363
1364 #[test]
1365 fn fused_source_order_uses_rrf_and_total_limit() {
1366 let ctx = AskContext {
1367 source_limit: 2,
1368 filtered_rows: vec![test_row("incidents", 2), test_row("incidents", 1)],
1369 vector_hits: vec![
1370 VectorHit {
1371 collection: "incidents".to_string(),
1372 entity_id: 1,
1373 score: 0.91,
1374 },
1375 VectorHit {
1376 collection: "docs".to_string(),
1377 entity_id: 9,
1378 score: 0.88,
1379 },
1380 ],
1381 ..AskContext::default()
1382 };
1383
1384 let order = fused_source_order(&ctx);
1385
1386 assert_eq!(
1387 order,
1388 vec![
1389 FusedSourceRef::FilteredRow(1),
1390 FusedSourceRef::FilteredRow(0)
1391 ]
1392 );
1393 }
1394
1395 #[test]
1396 fn fused_source_order_includes_graph_bucket() {
1397 let ctx = AskContext {
1398 source_limit: 4,
1399 filtered_rows: vec![test_row("incidents", 1)],
1400 text_hits: vec![test_text_hit("articles", 5, 1.2)],
1401 vector_hits: vec![
1402 VectorHit {
1403 collection: "incidents".to_string(),
1404 entity_id: 1,
1405 score: 0.91,
1406 },
1407 VectorHit {
1408 collection: "docs".to_string(),
1409 entity_id: 9,
1410 score: 0.88,
1411 },
1412 ],
1413 graph_hits: vec![test_graph_hit("topology", 7, 0.80, 1)],
1414 ..AskContext::default()
1415 };
1416
1417 let order = fused_source_order(&ctx);
1418
1419 assert_eq!(
1420 order,
1421 vec![
1422 FusedSourceRef::FilteredRow(0),
1423 FusedSourceRef::TextHit(0),
1424 FusedSourceRef::GraphHit(0),
1425 FusedSourceRef::VectorHit(1),
1426 ]
1427 );
1428 }
1429
1430 #[test]
1431 fn text_search_bm25_scoped_ranks_specific_document_first() {
1432 let rt = fresh_runtime();
1433 rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1434 .expect("create docs");
1435 rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1436 .expect("insert specific doc");
1437 rt.execute_query(
1438 "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1439 )
1440 .expect("insert broad doc");
1441
1442 let scope = make_scope(["docs".to_string()].into_iter().collect());
1443 let candidates = CandidateCollections {
1444 collections: vec!["docs".to_string()],
1445 columns_by_collection: HashMap::new(),
1446 };
1447 let hits = text_search_bm25_scoped(&rt, &scope, "passport renewal", &candidates, 10);
1448
1449 assert_eq!(hits.len(), 2);
1450 assert!(
1451 hits[0].score > hits[1].score,
1452 "BM25 text bucket should prefer the shorter exact match: {hits:?}"
1453 );
1454 }
1455
1456 #[test]
1457 fn text_search_bm25_scoped_filters_rls_denied_hits() {
1458 let rt = fresh_runtime();
1459 rt.execute_query(
1460 "CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT) WITH CONTEXT INDEX ON (body)",
1461 )
1462 .expect("create docs");
1463 rt.execute_query(
1464 "INSERT INTO docs (id, tenant_id, body) VALUES \
1465 (1, 'acme', 'shared launch plan'), \
1466 (2, 'globex', 'shared launch plan')",
1467 )
1468 .expect("seed docs");
1469 rt.execute_query(
1470 "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1471 )
1472 .expect("create policy");
1473 rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1474 .expect("enable rls");
1475
1476 let _tenant = TenantGuard::set("acme");
1477 let scope = make_scope(["docs".to_string()].into_iter().collect());
1478 let candidates = CandidateCollections {
1479 collections: vec!["docs".to_string()],
1480 columns_by_collection: HashMap::new(),
1481 };
1482
1483 let hits = text_search_bm25_scoped(&rt, &scope, "shared launch", &candidates, 10);
1484
1485 assert_eq!(hits.len(), 1, "RLS should hide the globex hit: {hits:?}");
1486 let entity = rt
1487 .inner
1488 .db
1489 .store()
1490 .get(
1491 "docs",
1492 crate::storage::unified::entity::EntityId::new(hits[0].entity_id),
1493 )
1494 .expect("hit entity exists");
1495 assert_eq!(row_text(&entity, "tenant_id"), Some("acme"));
1496 }
1497
1498 #[test]
1499 fn execute_pipeline_retrieves_known_good_bm25_source_order() {
1500 let rt = fresh_runtime();
1501 rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1502 .expect("create docs");
1503 rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1504 .expect("insert specific doc");
1505 rt.execute_query(
1506 "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1507 )
1508 .expect("insert broad doc");
1509 rt.schema_vocabulary_apply(
1510 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1511 collection: "docs".to_string(),
1512 columns: vec!["body".into()],
1513 type_tags: Vec::new(),
1514 description: None,
1515 },
1516 );
1517
1518 let scope = make_scope(["docs".to_string()].into_iter().collect());
1519 let ctx = AskPipeline::execute_with_limit_and_min_score(
1520 &rt,
1521 &scope,
1522 "body passport renewal",
1523 2,
1524 None,
1525 Some(1),
1526 )
1527 .expect("pipeline executes");
1528
1529 assert_eq!(ctx.text_hits.len(), 2);
1530 assert!(
1531 ctx.text_hits[0].score > ctx.text_hits[1].score,
1532 "BM25 source order should prefer the shorter exact match: {:?}",
1533 ctx.text_hits
1534 );
1535 assert!(matches!(
1536 fused_source_order(&ctx).first(),
1537 Some(FusedSourceRef::TextHit(0))
1538 ));
1539 }
1540
1541 #[test]
1542 fn graph_search_scoped_honors_depth() {
1543 let rt = fresh_runtime();
1544 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('alice', 'Alice')")
1545 .expect("insert alice");
1546 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('bob', 'Bob')")
1547 .expect("insert bob");
1548 rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('carol', 'Carol')")
1549 .expect("insert carol");
1550 rt.execute_query(
1551 "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'alice', 'bob')",
1552 )
1553 .expect("insert alice-bob edge");
1554 rt.execute_query(
1555 "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'bob', 'carol')",
1556 )
1557 .expect("insert bob-carol edge");
1558
1559 let scope = make_scope(["tales".to_string()].into_iter().collect());
1560 let candidates = CandidateCollections {
1561 collections: vec!["tales".to_string()],
1562 columns_by_collection: HashMap::new(),
1563 };
1564 let depth1 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(1));
1565 let depth2 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(2));
1566
1567 assert!(
1568 depth1.iter().all(|hit| hit.depth <= 1),
1569 "DEPTH 1 returned hits beyond one hop: {depth1:?}"
1570 );
1571 assert!(
1572 depth2.iter().any(|hit| hit.depth == 2),
1573 "DEPTH 2 should include the second-hop graph hit: {depth2:?}"
1574 );
1575 }
1576
1577 #[test]
1578 fn filter_values_filters_rls_denied_rows() {
1579 let rt = fresh_runtime();
1580 rt.execute_query("CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT)")
1581 .expect("create docs");
1582 rt.execute_query(
1583 "INSERT INTO docs (id, tenant_id, body) VALUES \
1584 (1, 'acme', 'incident FDD-12313'), \
1585 (2, 'globex', 'incident FDD-12313')",
1586 )
1587 .expect("seed docs");
1588 rt.execute_query(
1589 "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1590 )
1591 .expect("create policy");
1592 rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1593 .expect("enable rls");
1594
1595 let _tenant = TenantGuard::set("acme");
1596 let scope = make_scope(["docs".to_string()].into_iter().collect());
1597 let candidates = CandidateCollections {
1598 collections: vec!["docs".to_string()],
1599 columns_by_collection: HashMap::from([("docs".to_string(), vec!["body".to_string()])]),
1600 };
1601 let tokens = TokenSet {
1602 keywords: vec!["incident".to_string()],
1603 literals: vec!["FDD-12313".to_string()],
1604 };
1605
1606 let rows = filter_values(&rt, &scope, &candidates, &tokens, 10);
1607
1608 assert_eq!(rows.len(), 1, "RLS should hide the globex row: {rows:?}");
1609 assert_eq!(row_text(&rows[0].entity, "tenant_id"), Some("acme"));
1610 }
1611
1612 #[test]
1615 fn execute_refuses_empty_token_set() {
1616 let rt = fresh_runtime();
1617 let scope = make_scope(HashSet::new());
1618 let err = AskPipeline::execute(&rt, &scope, "??? ...")
1619 .expect_err("empty token set must short-circuit");
1620 let msg = format!("{err}");
1621 assert!(
1622 msg.contains("yielded no usable tokens"),
1623 "expected structured empty-token error, got: {msg}"
1624 );
1625 }
1626
1627 #[test]
1632 fn match_schema_intersects_with_visible_set() {
1633 let rt = fresh_runtime();
1634 rt.schema_vocabulary_apply(
1638 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1639 collection: "travel".to_string(),
1640 columns: vec!["id".into(), "passport".into()],
1641 type_tags: Vec::new(),
1642 description: None,
1643 },
1644 );
1645 rt.schema_vocabulary_apply(
1646 crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1647 collection: "secrets".to_string(),
1648 columns: vec!["passport".into()],
1649 type_tags: Vec::new(),
1650 description: None,
1651 },
1652 );
1653 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1654 let scope = make_scope(visible.clone());
1655 let tokens = TokenSet {
1656 keywords: vec!["passport".to_string()],
1657 literals: Vec::new(),
1658 };
1659 let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
1660 assert_eq!(candidates.collections, vec!["travel".to_string()]);
1661 assert!(!candidates.collections.contains(&"secrets".to_string()));
1662 let cols = candidates
1664 .columns_by_collection
1665 .get("travel")
1666 .expect("hint columns");
1667 assert!(cols.contains(&"passport".to_string()));
1668 }
1669
1670 use proptest::prelude::*;
1678
1679 fn arb_collection() -> impl Strategy<Value = String> {
1680 "[a-z]{1,4}"
1681 }
1682
1683 fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
1684 prop::collection::hash_set(arb_collection(), 0..6)
1685 }
1686
1687 fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
1688 prop::collection::vec(arb_collection(), 0..8)
1689 }
1690
1691 proptest! {
1692 #![proptest_config(ProptestConfig::with_cases(256))]
1693 #[test]
1694 fn stage4_rows_subset_of_visible_collections(
1695 visible in arb_visible(),
1696 candidate_names in arb_candidates(),
1697 literal_count in 0usize..3,
1698 ) {
1699 let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
1705 let candidates = CandidateCollections {
1706 collections: candidate_names,
1707 columns_by_collection: HashMap::new(),
1708 };
1709 let literals: Vec<String> = (0..literal_count)
1710 .map(|i| format!("ID-{i}"))
1711 .collect();
1712 let tokens = TokenSet {
1713 keywords: vec!["passport".to_string()],
1714 literals,
1715 };
1716 let scope = make_scope(visible.clone());
1717 let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
1718 for row in &rows {
1719 prop_assert!(
1720 visible.contains(&row.collection),
1721 "Stage 4 leaked row collection={} not in visible={:?}",
1722 row.collection, visible
1723 );
1724 }
1725 }
1726 }
1727
1728 static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
1729
1730 #[test]
1743 fn integration_passport_fdd_12313_funnels_through_four_stages() {
1744 let rt = fresh_runtime();
1745 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1749 .expect("CREATE TABLE travel");
1750 rt.execute_query(
1751 "INSERT INTO travel (id, passport, notes) VALUES \
1752 (1, 'BR-001', 'unrelated note'), \
1753 (2, 'PT-002', 'incident FDD-12313 escalated'), \
1754 (3, 'US-003', 'standard renewal')",
1755 )
1756 .expect("seed rows");
1757 rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
1759 .expect("CREATE TABLE secrets");
1760 rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
1761 .expect("seed secrets");
1762
1763 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1764 let scope = make_scope(visible);
1765
1766 let ctx = AskPipeline::execute(
1767 &rt,
1768 &scope,
1769 "quais as novidades sobre o passport FDD-12313?",
1770 )
1771 .expect("pipeline runs");
1772
1773 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1775 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1776
1777 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1781
1782 let _ = &ctx.vector_hits;
1786
1787 assert!(
1790 ctx.filtered_rows
1791 .iter()
1792 .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1793 "expected travel row with FDD-12313 match, got: {:?}",
1794 ctx.filtered_rows
1795 );
1796 for row in &ctx.filtered_rows {
1797 assert_ne!(
1798 row.collection, "secrets",
1799 "secrets row leaked into Stage 4 output"
1800 );
1801 }
1802
1803 let _ = ctx.timings.extract_us
1807 + ctx.timings.schema_us
1808 + ctx.timings.vector_us
1809 + ctx.timings.filter_us;
1810 }
1811
1812 fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1826 let store = rt.inner.db.store();
1827 store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1828 }
1829
1830 #[test]
1833 fn routed_default_backend_runs_heuristic() {
1834 let rt = fresh_runtime();
1835 let scope = make_scope(HashSet::new());
1836 let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1837 .expect("heuristic path is infallible");
1838 assert!(tokens.keywords.contains(&"passport".to_string()));
1839 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1840 }
1841
1842 #[tokio::test(flavor = "multi_thread")]
1845 async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1846 let rt = fresh_runtime();
1847 write_config(&rt, "ai.ner.backend", "llm");
1848 write_config(&rt, "ai.ner.fallback", "use_heuristic");
1849 let scope = make_scope(HashSet::new());
1850 let tokens = tokio::task::spawn_blocking(move || {
1851 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1852 })
1853 .await
1854 .unwrap()
1855 .expect("fallback policy keeps the call OK");
1856 assert!(tokens.keywords.contains(&"passport".to_string()));
1857 assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1858 }
1859
1860 #[tokio::test(flavor = "multi_thread")]
1863 async fn routed_llm_auth_denied_empty_on_fail() {
1864 let rt = fresh_runtime();
1865 write_config(&rt, "ai.ner.backend", "llm");
1866 write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1867 let scope = make_scope(HashSet::new());
1868 let tokens = tokio::task::spawn_blocking(move || {
1869 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1870 })
1871 .await
1872 .unwrap()
1873 .expect("empty_on_fail returns Ok with empty TokenSet");
1874 assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1875 }
1876
1877 #[tokio::test(flavor = "multi_thread")]
1881 async fn routed_llm_auth_denied_propagate_returns_error() {
1882 let rt = fresh_runtime();
1883 write_config(&rt, "ai.ner.backend", "llm");
1884 write_config(&rt, "ai.ner.fallback", "propagate");
1885 let scope = make_scope(HashSet::new());
1886 let err = tokio::task::spawn_blocking(move || {
1887 extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1888 })
1889 .await
1890 .unwrap()
1891 .expect_err("propagate must surface the error");
1892 let msg = format!("{err}");
1893 assert!(
1894 msg.contains("propagate") || msg.contains("ai.ner.backend"),
1895 "expected propagate error message, got: {msg}"
1896 );
1897 }
1898
1899 #[tokio::test(flavor = "multi_thread")]
1904 async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1905 let rt = fresh_runtime();
1906 write_config(&rt, "ai.ner.backend", "llm");
1907 rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1909 .expect("CREATE TABLE travel");
1910 rt.execute_query(
1911 "INSERT INTO travel (id, passport, notes) VALUES \
1912 (2, 'PT-002', 'incident FDD-12313 escalated')",
1913 )
1914 .expect("seed rows");
1915 let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1916 let scope = make_scope(visible);
1917 let ctx = tokio::task::spawn_blocking(move || {
1918 AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1919 })
1920 .await
1921 .unwrap()
1922 .expect("pipeline runs");
1923 assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1924 assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1925 assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1926 assert!(
1927 ctx.filtered_rows
1928 .iter()
1929 .any(|r| r.matched_literal == "FDD-12313"),
1930 "Stage 4 still runs after Stage 1 fallback"
1931 );
1932 }
1933}