Skip to main content

reddb_server/runtime/
ask_pipeline.rs

1//! AskPipeline — issue #121.
2//!
3//! 4-stage funnel that turns a natural-language ASK question into a
4//! scoped, filtered candidate set the LLM call can synthesise an
5//! answer over. Each stage is a pure function so the pipeline reads
6//! top-to-bottom in `execute` and individual stages can be unit-tested
7//! in isolation:
8//!
9//! 1. **`extract_tokens`** — heuristic NER. Splits the question into
10//!    `keywords` (alphanumeric words) and `literals` (uppercase ID-like
11//!    tokens, e.g. `FDD-12313`). LLM-NER is reserved for slice #123.
12//! 2. **`match_schema`** — `SchemaVocabulary::lookup` per keyword;
13//!    the resulting collection set is intersected with the caller's
14//!    `EffectiveScope.visible_collections` so out-of-scope tables
15//!    never enter the candidate pool. Issue #119 pre-filter.
16//! 3. **`vector_search_scoped`** — best-effort embedding + similarity
17//!    over the candidate collections via `AuthorizedSearch`. When no
18//!    embedding API is available (most unit-test fixtures), the stage
19//!    short-circuits with an empty match list — Stage 4 still runs.
20//! 4. **`filter_values`** — applies literal tokens as exact filters
21//!    over candidate-collection columns, returning the rows that
22//!    actually mention each literal.
23//!
24//! Output: typed [`AskContext`] holding all four stage outputs plus
25//! per-stage timing. Empty token-set short-circuits with a structured
26//! error so callers don't pay for an LLM round-trip on a query that
27//! contained nothing addressable.
28//!
29//! The legacy `RedDBRuntime::search_context` is now a Stage-2-internal
30//! helper for the broad-recall fallback; ASK no longer reaches for it
31//! directly.
32
33use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40    AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::application::SearchContextInput;
46use crate::storage::schema::Value;
47use crate::storage::unified::entity::{EntityData, EntityKind, UnifiedEntity};
48
49/// Default cap for Stage 4 row output. Override per-call via
50/// [`AskPipeline::execute_with_limit`].
51pub const DEFAULT_ROW_CAP: usize = 20;
52
53/// Token bag produced by Stage 1.
54#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TokenSet {
56    /// Lowercase keyword tokens (regex `[A-Za-z][A-Za-z0-9_]+`,
57    /// length ≥ 2). Used by Stage 2 to probe `SchemaVocabulary`.
58    pub keywords: Vec<String>,
59    /// Literal-id tokens kept in their original case so Stage 4 can
60    /// run case-sensitive equality / substring filters. Matches
61    /// `[A-Z0-9-]{3,}` containing at least one digit OR `[0-9]{6,}`.
62    pub literals: Vec<String>,
63}
64
65impl TokenSet {
66    pub fn is_empty(&self) -> bool {
67        self.keywords.is_empty() && self.literals.is_empty()
68    }
69}
70
71/// Stage 2 output: collections likely to contain the answer.
72#[derive(Debug, Clone, Default)]
73pub struct CandidateCollections {
74    /// Collection names, sorted, deduplicated, intersected with the
75    /// caller's `EffectiveScope.visible_collections`.
76    pub collections: Vec<String>,
77    /// Columns hinted by `SchemaVocabulary` for each candidate
78    /// collection. Used by Stage 4 to scope the literal filter to
79    /// promising columns first.
80    pub columns_by_collection: HashMap<String, Vec<String>>,
81}
82
83/// One Stage 3 BM25 text hit from the context index.
84#[derive(Debug, Clone)]
85pub struct TextHit {
86    pub collection: String,
87    pub entity_id: u64,
88    pub score: f32,
89}
90
91/// One Stage 3 vector hit, kept thin so the pipeline doesn't pull
92/// the full `ScoredMatch` shape from `dsl::QueryResult` through.
93#[derive(Debug, Clone)]
94pub struct VectorHit {
95    pub collection: String,
96    pub entity_id: u64,
97    pub score: f32,
98}
99
100/// One graph traversal hit for the ASK graph retrieval bucket.
101#[derive(Debug, Clone)]
102pub struct GraphHit {
103    pub collection: String,
104    pub entity_id: u64,
105    pub score: f32,
106    pub depth: usize,
107    pub kind: GraphHitKind,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum GraphHitKind {
112    Node,
113    Edge,
114}
115
116/// Stage 4 output: rows that match a literal filter.
117#[derive(Debug, Clone)]
118pub struct FilteredRow {
119    pub collection: String,
120    pub entity: UnifiedEntity,
121    /// Literal token that matched this row.
122    pub matched_literal: String,
123    /// Column where the match was found, when known.
124    pub matched_column: Option<String>,
125}
126
127/// Per-stage timing in microseconds. Logged via tracing on every run.
128#[derive(Debug, Clone, Default, PartialEq, Eq)]
129pub struct StageTimings {
130    pub extract_us: u64,
131    pub schema_us: u64,
132    pub text_us: u64,
133    pub vector_us: u64,
134    pub graph_us: u64,
135    pub filter_us: u64,
136}
137
138/// Typed context handed back to the ASK caller. Carries all four
139/// stage outputs so the LLM-formatting helper (slice #122) can pick
140/// what it needs without re-running the funnel.
141#[derive(Debug, Clone)]
142pub struct AskContext {
143    pub question: String,
144    pub tokens: TokenSet,
145    pub candidates: CandidateCollections,
146    pub text_hits: Vec<TextHit>,
147    pub vector_hits: Vec<VectorHit>,
148    pub graph_hits: Vec<GraphHit>,
149    pub filtered_rows: Vec<FilteredRow>,
150    pub source_limit: usize,
151    pub timings: StageTimings,
152}
153
154impl Default for AskContext {
155    fn default() -> Self {
156        Self {
157            question: String::new(),
158            tokens: TokenSet::default(),
159            candidates: CandidateCollections::default(),
160            text_hits: Vec::new(),
161            vector_hits: Vec::new(),
162            graph_hits: Vec::new(),
163            filtered_rows: Vec::new(),
164            source_limit: DEFAULT_ROW_CAP,
165            timings: StageTimings::default(),
166        }
167    }
168}
169
170/// One fused source reference in the final ASK context order.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum FusedSourceRef {
173    FilteredRow(usize),
174    TextHit(usize),
175    VectorHit(usize),
176    GraphHit(usize),
177}
178
179/// One fused source reference plus its RRF score.
180#[derive(Debug, Clone, Copy, PartialEq)]
181pub struct FusedSource {
182    pub source: FusedSourceRef,
183    pub rrf_score: f64,
184}
185
186/// Pipeline entry point. Instances are stateless — kept as an empty
187/// enum so callers spell `AskPipeline::execute(...)` (matches the
188/// shape #121 calls out).
189pub enum AskPipeline {}
190
191impl AskPipeline {
192    /// Run all four stages with the default row cap.
193    pub fn execute(
194        runtime: &RedDBRuntime,
195        scope: &EffectiveScope,
196        question: &str,
197    ) -> RedDBResult<AskContext> {
198        Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
199    }
200
201    /// Run all four stages with a configurable Stage 4 row cap.
202    pub fn execute_with_limit(
203        runtime: &RedDBRuntime,
204        scope: &EffectiveScope,
205        question: &str,
206        row_cap: usize,
207    ) -> RedDBResult<AskContext> {
208        Self::execute_with_limit_and_min_score(runtime, scope, question, row_cap, None, None)
209    }
210
211    /// Run all four stages with a configurable row cap and per-bucket
212    /// minimum score for retrieval stages that expose native scores.
213    pub fn execute_with_limit_and_min_score(
214        runtime: &RedDBRuntime,
215        scope: &EffectiveScope,
216        question: &str,
217        row_cap: usize,
218        min_score: Option<f32>,
219        graph_depth: Option<usize>,
220    ) -> RedDBResult<AskContext> {
221        let span = info_span!(
222            "ask_pipeline.execute",
223            tenant = ?scope.effective_scope(),
224            question_len = question.len(),
225            row_cap = row_cap,
226            min_score = ?min_score,
227            graph_depth = ?graph_depth,
228        );
229        let _enter = span.enter();
230
231        // Stage 1. Routed through `extract_tokens_routed` so the
232        // `ai.ner.backend` config knob can swap the heuristic for the
233        // opt-in LLM NER without converting the surrounding pipeline
234        // to async (see `extract_tokens_routed` for the rationale).
235        let stage1 = Instant::now();
236        let tokens = extract_tokens_routed(runtime, scope, question)?;
237        let extract_us = stage1.elapsed().as_micros() as u64;
238        debug!(
239            target: "ask_pipeline",
240            stage = "extract_tokens",
241            keywords = ?tokens.keywords,
242            literals = ?tokens.literals,
243            elapsed_us = extract_us,
244            "stage 1 done"
245        );
246        if tokens.is_empty() {
247            warn!(
248                target: "ask_pipeline",
249                question_len = question.len(),
250                "refused: empty token set"
251            );
252            return Err(RedDBError::Query(
253                "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
254                    .to_string(),
255            ));
256        }
257
258        // Stage 2.
259        let stage2 = Instant::now();
260        let candidates = match_schema(runtime, scope, &tokens)?;
261        let schema_us = stage2.elapsed().as_micros() as u64;
262        debug!(
263            target: "ask_pipeline",
264            stage = "match_schema",
265            collections = ?candidates.collections,
266            elapsed_us = schema_us,
267            "stage 2 done"
268        );
269
270        // Stage 3.
271        let stage3 = Instant::now();
272        let text_hits = text_search_bm25_scoped(runtime, scope, question, &candidates, row_cap);
273        let text_us = stage3.elapsed().as_micros() as u64;
274        debug!(
275            target: "ask_pipeline",
276            stage = "text_search_bm25_scoped",
277            hits = text_hits.len(),
278            elapsed_us = text_us,
279            "stage 3 done"
280        );
281
282        // Stage 3b.
283        let stage3b = Instant::now();
284        let vector_hits =
285            vector_search_scoped(runtime, scope, question, &candidates, row_cap, min_score);
286        let vector_us = stage3b.elapsed().as_micros() as u64;
287        debug!(
288            target: "ask_pipeline",
289            stage = "vector_search_scoped",
290            hits = vector_hits.len(),
291            elapsed_us = vector_us,
292            "stage 3b done"
293        );
294
295        // Stage 3c.
296        let stage3c = Instant::now();
297        let graph_hits = graph_search_scoped(
298            runtime,
299            scope,
300            question,
301            &candidates,
302            row_cap,
303            min_score,
304            graph_depth,
305        );
306        let graph_us = stage3c.elapsed().as_micros() as u64;
307        debug!(
308            target: "ask_pipeline",
309            stage = "graph_search_scoped",
310            hits = graph_hits.len(),
311            elapsed_us = graph_us,
312            "stage 3c done"
313        );
314
315        // Stage 4.
316        let stage4 = Instant::now();
317        let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
318        let filter_us = stage4.elapsed().as_micros() as u64;
319        debug!(
320            target: "ask_pipeline",
321            stage = "filter_values",
322            rows = filtered_rows.len(),
323            elapsed_us = filter_us,
324            "stage 4 done"
325        );
326
327        Ok(AskContext {
328            question: question.to_string(),
329            tokens,
330            candidates,
331            text_hits,
332            vector_hits,
333            graph_hits,
334            filtered_rows,
335            source_limit: row_cap,
336            timings: StageTimings {
337                extract_us,
338                schema_us,
339                text_us,
340                vector_us,
341                graph_us,
342                filter_us,
343            },
344        })
345    }
346}
347
348/// Fuse row-filter and vector buckets into the single ranked source
349/// order used by prompt rendering and `sources_flat`.
350pub fn fused_source_order(ctx: &AskContext) -> Vec<FusedSourceRef> {
351    fused_sources(ctx)
352        .into_iter()
353        .map(|fused| fused.source)
354        .collect()
355}
356
357/// Fuse row-filter and vector buckets into the single ranked source
358/// order with the RRF score preserved for `EXPLAIN ASK`.
359pub fn fused_sources(ctx: &AskContext) -> Vec<FusedSource> {
360    use super::ai::rrf_fuser::{fuse, Bucket, Candidate, RRF_K_DEFAULT};
361
362    if ctx.source_limit == 0
363        || (ctx.filtered_rows.is_empty()
364            && ctx.text_hits.is_empty()
365            && ctx.vector_hits.is_empty()
366            && ctx.graph_hits.is_empty())
367    {
368        return Vec::new();
369    }
370
371    let mut refs: HashMap<String, FusedSourceRef> = HashMap::new();
372    let row_bucket = Bucket {
373        candidates: ctx
374            .filtered_rows
375            .iter()
376            .enumerate()
377            .map(|(idx, row)| {
378                let id = source_identity(&row.collection, row.entity.id.raw());
379                refs.entry(id.clone())
380                    .or_insert(FusedSourceRef::FilteredRow(idx));
381                Candidate { id, score: 1.0 }
382            })
383            .collect(),
384        min_score: None,
385    };
386    let text_bucket = Bucket {
387        candidates: ctx
388            .text_hits
389            .iter()
390            .enumerate()
391            .map(|(idx, hit)| {
392                let id = source_identity(&hit.collection, hit.entity_id);
393                refs.entry(id.clone())
394                    .or_insert(FusedSourceRef::TextHit(idx));
395                Candidate {
396                    id,
397                    score: hit.score as f64,
398                }
399            })
400            .collect(),
401        min_score: None,
402    };
403    let vector_bucket = Bucket {
404        candidates: ctx
405            .vector_hits
406            .iter()
407            .enumerate()
408            .map(|(idx, hit)| {
409                let id = source_identity(&hit.collection, hit.entity_id);
410                refs.entry(id.clone())
411                    .or_insert(FusedSourceRef::VectorHit(idx));
412                Candidate {
413                    id,
414                    score: hit.score as f64,
415                }
416            })
417            .collect(),
418        min_score: None,
419    };
420    let graph_bucket = Bucket {
421        candidates: ctx
422            .graph_hits
423            .iter()
424            .enumerate()
425            .map(|(idx, hit)| {
426                let id = source_identity(&hit.collection, hit.entity_id);
427                refs.entry(id.clone())
428                    .or_insert(FusedSourceRef::GraphHit(idx));
429                Candidate {
430                    id,
431                    score: hit.score as f64,
432                }
433            })
434            .collect(),
435        min_score: None,
436    };
437
438    fuse(
439        &[row_bucket, text_bucket, vector_bucket, graph_bucket],
440        RRF_K_DEFAULT,
441        ctx.source_limit,
442    )
443    .into_iter()
444    .filter_map(|item| {
445        refs.get(&item.id).copied().map(|source| FusedSource {
446            source,
447            rrf_score: item.rrf_score,
448        })
449    })
450    .collect()
451}
452
453fn source_identity(collection: &str, entity_id: u64) -> String {
454    format!("{collection}/{entity_id}")
455}
456
457// ---------------------------------------------------------------------------
458// Stage 3 — BM25 text search scoped to the candidate collections.
459// ---------------------------------------------------------------------------
460
461/// Run the context index's sparse BM25 ranker over the candidate
462/// collections. This is the ASK text bucket used by RRF; literal row
463/// filtering remains as a separate high-precision bucket.
464pub fn text_search_bm25_scoped(
465    runtime: &RedDBRuntime,
466    scope: &EffectiveScope,
467    question: &str,
468    candidates: &CandidateCollections,
469    top_k: usize,
470) -> Vec<TextHit> {
471    if candidates.collections.is_empty() || top_k == 0 {
472        return Vec::new();
473    }
474
475    let visible = scope.visible_collections();
476    let allowed: BTreeSet<String> = candidates
477        .collections
478        .iter()
479        .filter(|collection| visible.is_none_or(|set| set.contains(*collection)))
480        .cloned()
481        .collect();
482    if allowed.is_empty() {
483        return Vec::new();
484    }
485
486    let _scope_guard = AskScopeGuard::install(scope);
487    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
488    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
489    let store = runtime.inner.db.store();
490
491    runtime
492        .inner
493        .db
494        .store()
495        .context_index()
496        .search_bm25(question, top_k, Some(&allowed))
497        .into_iter()
498        .filter_map(|hit| {
499            let entity = store.get(&hit.collection, hit.entity_id)?;
500            if !ask_entity_allowed(
501                runtime,
502                scope,
503                &hit.collection,
504                &entity,
505                snap_ctx.as_ref(),
506                &mut rls_cache,
507            ) {
508                return None;
509            }
510            Some(TextHit {
511                collection: hit.collection,
512                entity_id: hit.entity_id.raw(),
513                score: hit.score,
514            })
515        })
516        .collect()
517}
518
519// ---------------------------------------------------------------------------
520// Stage 1 — token / entity extraction (heuristic v1) + opt-in LLM NER routing.
521// ---------------------------------------------------------------------------
522
523/// Stage-1 dispatcher honouring the `ai.ner.backend` config knob.
524///
525/// The pipeline (and `execute_with_limit` in particular) stays
526/// **sync** per #123 deepening to avoid an async cascade through the
527/// rest of `ask_pipeline`. When the operator turns on
528/// `ai.ner.backend = "llm"`, we contain the async surface here by
529/// using `tokio::runtime::Handle::current().block_on(...)` — works
530/// when called from inside an async context (the production HTTP
531/// handler path), falls back to the heuristic with a warn-log when
532/// no Tokio runtime is reachable (sync test contexts and embedded
533/// callers without a runtime).
534///
535/// Auth gate: `LlmNer::extract` checks `ai:ner:read`. Today
536/// `EffectiveScope::has_capability` is a placeholder that always
537/// returns `false`, so a LLM-backend-configured deployment will see
538/// every call deny at the gate and the configured `HeuristicFallback`
539/// fires. A one-shot info log makes that visible in operator logs.
540/// Wiring the real capability check into the auth engine is future
541/// work — the routing seam is in place so that landing the auth
542/// extension is a one-line `EffectiveScope` change.
543fn extract_tokens_routed(
544    runtime: &RedDBRuntime,
545    scope: &EffectiveScope,
546    question: &str,
547) -> RedDBResult<TokenSet> {
548    let backend = runtime.config_string("ai.ner.backend", "heuristic");
549    if backend != "llm" {
550        return Ok(extract_tokens(question));
551    }
552
553    let endpoint = runtime.config_string("ai.ner.endpoint", "");
554    let model = runtime.config_string("ai.ner.model", "");
555    let timeout_ms = runtime
556        .config_string("ai.ner.timeout_ms", "5000")
557        .parse::<u32>()
558        .unwrap_or(5000);
559    let fallback = match runtime
560        .config_string("ai.ner.fallback", "use_heuristic")
561        .as_str()
562    {
563        "empty_on_fail" => HeuristicFallback::EmptyOnFail,
564        "propagate" => HeuristicFallback::Propagate,
565        _ => HeuristicFallback::UseHeuristic,
566    };
567
568    // Endpoint shape decides provider; default to OpenAI-compat when
569    // unspecified — matches the documented config shape.
570    let provider = if endpoint.is_empty() && model.is_empty() {
571        // No network config provided — operator opted into "llm" but
572        // didn't wire a backend. Fall back to a Stub::Empty so the
573        // configured fallback policy fires deterministically.
574        NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
575    } else {
576        NerProvider::OpenAiCompat { endpoint, model }
577    };
578
579    let mut ner = LlmNer::new(provider, fallback);
580    ner.timeout_ms = timeout_ms;
581
582    let auth = ScopeAuthAdapter(scope);
583    let llm_result = match tokio::runtime::Handle::try_current() {
584        Ok(handle) => {
585            // Use `block_on` from a thread that's not driving the
586            // current runtime — the typical caller is an Axum HTTP
587            // handler running on the multi-thread runtime, so we hop
588            // off via `block_in_place`.
589            tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
590        }
591        Err(_) => {
592            warn!(
593                target: "ask_pipeline",
594                "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
595            );
596            return Ok(extract_tokens(question));
597        }
598    };
599
600    match llm_result {
601        Ok(tokens) => Ok(tokens),
602        Err(NerError::AuthDenied) => {
603            log_auth_denial_once();
604            // Auth denials never honour fallback inside `LlmNer`, so
605            // the routing layer applies the configured fallback here
606            // — this is the bridge until the auth engine wires the
607            // capability for real.
608            apply_fallback(fallback, question)
609        }
610        Err(err) => {
611            warn!(
612                target: "ask_pipeline",
613                error = %err,
614                "LlmNer extract failed; honouring HeuristicFallback policy"
615            );
616            apply_fallback(fallback, question)
617        }
618    }
619}
620
621fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
622    match fallback {
623        HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
624        HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
625        HeuristicFallback::Propagate => Err(RedDBError::Query(
626            "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
627        )),
628    }
629}
630
631/// One-shot info log for the auth-gate placeholder. Until the auth
632/// engine learns to grant `ai:ner:read`, every routed call denies —
633/// we emit the explainer once per process so logs aren't spammed.
634fn log_auth_denial_once() {
635    static EMITTED: AtomicBool = AtomicBool::new(false);
636    if !EMITTED.swap(true, Ordering::Relaxed) {
637        info!(
638            target: "ask_pipeline",
639            capability = NER_CAPABILITY,
640            "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
641            NER_CAPABILITY
642        );
643    }
644}
645
646/// Adapter wrapping `EffectiveScope` so it can drive the
647/// `LlmNer`-side `AuthContext` trait without leaking that trait
648/// across the rest of the runtime.
649struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
650
651impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
654    }
655}
656
657impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
658    fn has_capability(&self, capability: &str) -> bool {
659        self.0.has_capability(capability)
660    }
661}
662
663/// Split a question into a [`TokenSet`]. Pure function — no runtime
664/// lookups.
665///
666/// Rules (heuristic v1):
667/// - **Literals** match `[A-Z0-9-]{3,}` AND contain ≥1 digit, OR are
668///   pure digit runs of length ≥ 6. Captures id shapes like
669///   `FDD-12313`, `INV-2024-001`, `123456`.
670/// - **Keywords** match `[A-Za-z][A-Za-z0-9_]+` (length ≥ 2),
671///   normalised to lowercase. Stop words are dropped to avoid
672///   wasting Stage-2 lookups on noise.
673pub fn extract_tokens(question: &str) -> TokenSet {
674    let mut keywords: Vec<String> = Vec::new();
675    let mut literals: Vec<String> = Vec::new();
676
677    let mut chars = question.chars().peekable();
678    let mut buf = String::new();
679
680    let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
681        if buf.is_empty() {
682            return;
683        }
684        let word = std::mem::take(buf);
685        classify_token(&word, keywords, literals);
686    };
687
688    while let Some(ch) = chars.next() {
689        if ch.is_alphanumeric() || ch == '_' || ch == '-' {
690            buf.push(ch);
691        } else {
692            flush(&mut buf, &mut keywords, &mut literals);
693            // Skip remaining whitespace / punctuation; loop continues.
694            let _ = ch;
695        }
696        // Look ahead to also flush on EOF.
697        if chars.peek().is_none() {
698            flush(&mut buf, &mut keywords, &mut literals);
699        }
700    }
701    // Final flush in case the loop body didn't see EOF (empty
702    // iterator path).
703    if !buf.is_empty() {
704        classify_token(&buf, &mut keywords, &mut literals);
705    }
706
707    // Dedup keywords + literals while preserving order.
708    let mut seen = HashSet::new();
709    keywords.retain(|tok| seen.insert(tok.clone()));
710    let mut seen_lit = HashSet::new();
711    literals.retain(|tok| seen_lit.insert(tok.clone()));
712
713    TokenSet { keywords, literals }
714}
715
716fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
717    // Literal: shape `[A-Z0-9-]{3,}` with at least one digit, OR
718    // pure-digit run of length ≥ 6.
719    let is_upper_id_shape = word.len() >= 3
720        && word
721            .chars()
722            .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
723        && word.chars().any(|c| c.is_ascii_digit())
724        && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
725    let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
726    if is_upper_id_shape || is_long_digit_run {
727        literals.push(word.to_string());
728        return;
729    }
730    // Keyword: starts with a letter, len ≥ 2, drop trailing/leading
731    // hyphens, lowercase.
732    let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
733    if trimmed.len() < 2 {
734        return;
735    }
736    if !trimmed
737        .chars()
738        .next()
739        .map(|c| c.is_ascii_alphabetic())
740        .unwrap_or(false)
741    {
742        return;
743    }
744    if !trimmed
745        .chars()
746        .all(|c| c.is_ascii_alphanumeric() || c == '_')
747    {
748        // Hyphenated word that wasn't a literal — skip rather than
749        // index a fragmented token.
750        return;
751    }
752    let lower = trimmed.to_ascii_lowercase();
753    if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
754        return;
755    }
756    keywords.push(lower);
757}
758
759/// Sorted ascii lowercase stop-word list. Kept tiny and curated so a
760/// regression in Stage 2 candidate-narrowing surfaces fast.
761const STOP_WORDS: &[&str] = &[
762    "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
763    "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
764    "where", "which", "with",
765];
766
767// ---------------------------------------------------------------------------
768// Stage 2 — schema-vocabulary match.
769// ---------------------------------------------------------------------------
770
771/// For each keyword, probe `SchemaVocabulary` and intersect the
772/// resulting collection set with `scope.visible_collections`. Returns
773/// the deduplicated, sorted candidate list plus a per-collection
774/// column hint set for Stage 4.
775pub fn match_schema(
776    runtime: &RedDBRuntime,
777    scope: &EffectiveScope,
778    tokens: &TokenSet,
779) -> RedDBResult<CandidateCollections> {
780    let visible = match scope.visible_collections() {
781        Some(set) => set.clone(),
782        None => {
783            // No scope wired = embedded mode. Fall back to "every
784            // collection in the DB" so the pipeline still runs;
785            // AuthorizedSearch is the seam that refuses the deny-
786            // default for AI commands.
787            runtime
788                .inner
789                .db
790                .store()
791                .list_collections()
792                .into_iter()
793                .collect()
794        }
795    };
796
797    let mut collections: BTreeSet<String> = BTreeSet::new();
798    let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
799    for keyword in &tokens.keywords {
800        let hits = runtime.schema_vocabulary_lookup(keyword);
801        for hit in hits {
802            if !visible.contains(&hit.collection) {
803                continue;
804            }
805            collections.insert(hit.collection.clone());
806            if let Some(column) = hit.column {
807                columns_by_collection
808                    .entry(hit.collection)
809                    .or_default()
810                    .insert(column);
811            }
812        }
813    }
814
815    Ok(CandidateCollections {
816        collections: collections.into_iter().collect(),
817        columns_by_collection: columns_by_collection
818            .into_iter()
819            .map(|(k, v)| (k, v.into_iter().collect()))
820            .collect(),
821    })
822}
823
824// ---------------------------------------------------------------------------
825// Stage 3 — vector search scoped to the candidate collections.
826// ---------------------------------------------------------------------------
827
828/// Embed the question (best-effort — skipped if no provider is
829/// configured, see `embed_question`) and run
830/// `AuthorizedSearch::execute_similar` over each candidate
831/// collection. Returns at most `top_k` hits sorted by similarity.
832pub fn vector_search_scoped(
833    runtime: &RedDBRuntime,
834    scope: &EffectiveScope,
835    question: &str,
836    candidates: &CandidateCollections,
837    top_k: usize,
838    min_score: Option<f32>,
839) -> Vec<VectorHit> {
840    if candidates.collections.is_empty() {
841        return Vec::new();
842    }
843    let Some(embedding) = embed_question(runtime, question) else {
844        return Vec::new();
845    };
846    let per_collection = top_k.max(1);
847    let mut hits: Vec<VectorHit> = Vec::new();
848    let _scope_guard = AskScopeGuard::install(scope);
849    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
850    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
851    let store = runtime.inner.db.store();
852    for collection in &candidates.collections {
853        match super::authorized_search::AuthorizedSearch::execute_similar(
854            runtime,
855            scope,
856            collection,
857            &embedding,
858            per_collection,
859            min_score.unwrap_or(0.0),
860        ) {
861            Ok(results) => {
862                for result in results {
863                    let Some(entity) = store.get(collection, result.entity_id) else {
864                        continue;
865                    };
866                    if !ask_entity_allowed(
867                        runtime,
868                        scope,
869                        collection,
870                        &entity,
871                        snap_ctx.as_ref(),
872                        &mut rls_cache,
873                    ) {
874                        continue;
875                    }
876                    hits.push(VectorHit {
877                        collection: collection.clone(),
878                        entity_id: result.entity_id.raw(),
879                        score: result.score,
880                    });
881                }
882            }
883            Err(err) => {
884                debug!(
885                    target: "ask_pipeline",
886                    collection = collection,
887                    err = %err,
888                    "vector_search_scoped: collection skipped"
889                );
890            }
891        }
892    }
893    hits.sort_by(|a, b| {
894        b.score
895            .partial_cmp(&a.score)
896            .unwrap_or(std::cmp::Ordering::Equal)
897            .then_with(|| a.entity_id.cmp(&b.entity_id))
898    });
899    hits.truncate(top_k);
900    hits
901}
902
903// ---------------------------------------------------------------------------
904// Stage 3b — graph traversal scoped to candidate collections.
905// ---------------------------------------------------------------------------
906
907/// Run context search with graph expansion enabled and keep only
908/// graph-traversal hits. `ASK ... DEPTH N` controls the traversal
909/// horizon; absent depth follows the existing ASK tool default.
910pub fn graph_search_scoped(
911    runtime: &RedDBRuntime,
912    scope: &EffectiveScope,
913    question: &str,
914    candidates: &CandidateCollections,
915    top_k: usize,
916    min_score: Option<f32>,
917    graph_depth: Option<usize>,
918) -> Vec<GraphHit> {
919    if candidates.collections.is_empty() || top_k == 0 {
920        return Vec::new();
921    }
922    let depth = graph_depth
923        .unwrap_or(crate::runtime::ai::mcp_ask_tool::DEPTH_DEFAULT as usize)
924        .max(1);
925    let _scope_guard = AskScopeGuard::install(scope);
926    let input = SearchContextInput {
927        query: question.to_string(),
928        field: None,
929        vector: None,
930        collections: Some(candidates.collections.clone()),
931        graph_depth: Some(depth),
932        graph_max_edges: None,
933        max_cross_refs: Some(0),
934        follow_cross_refs: Some(false),
935        expand_graph: Some(true),
936        global_scan: Some(true),
937        reindex: Some(false),
938        limit: Some(top_k),
939        min_score,
940    };
941    let result = match if scope.visible_collections().is_some() {
942        super::authorized_search::AuthorizedSearch::execute_context(runtime, scope, input)
943    } else {
944        runtime.search_context(input)
945    } {
946        Ok(result) => result,
947        Err(err) => {
948            debug!(
949                target: "ask_pipeline",
950                err = %err,
951                "graph_search_scoped: context search skipped"
952            );
953            return Vec::new();
954        }
955    };
956
957    let mut hits = Vec::new();
958    for entity in result.graph.nodes.into_iter().chain(result.graph.edges) {
959        let crate::runtime::DiscoveryMethod::GraphTraversal { depth, .. } = entity.discovery else {
960            continue;
961        };
962        let kind = match entity.entity.kind {
963            EntityKind::GraphNode(_) => GraphHitKind::Node,
964            EntityKind::GraphEdge(_) => GraphHitKind::Edge,
965            _ => continue,
966        };
967        hits.push(GraphHit {
968            collection: entity.collection,
969            entity_id: entity.entity.id.raw(),
970            score: entity.score,
971            depth,
972            kind,
973        });
974    }
975    hits.sort_by(|a, b| {
976        b.score
977            .partial_cmp(&a.score)
978            .unwrap_or(std::cmp::Ordering::Equal)
979            .then_with(|| a.depth.cmp(&b.depth))
980            .then_with(|| a.collection.cmp(&b.collection))
981            .then_with(|| a.entity_id.cmp(&b.entity_id))
982    });
983    hits.truncate(top_k);
984    hits
985}
986
987/// Best-effort embedding. Returns `None` when no embedding provider
988/// is configured (the unit-test fixture path) — the caller treats
989/// `None` as "Stage 3 yielded zero hits" and continues.
990fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
991    let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
992        match runtime.inner.db.get_kv("red_config", key) {
993            Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
994            Some(_) => Ok(None),
995            None => Ok(None),
996        }
997    };
998    let provider = crate::ai::resolve_default_provider(&kv_getter);
999    if !provider.is_openai_compatible() {
1000        return None;
1001    }
1002    let model = crate::ai::resolve_default_model(&provider, &kv_getter);
1003    let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
1004    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
1005    let request = crate::ai::OpenAiEmbeddingRequest {
1006        api_key,
1007        model,
1008        inputs: vec![question.to_string()],
1009        dimensions: None,
1010        api_base: provider.resolve_api_base(),
1011    };
1012    let response = crate::runtime::ai::block_on_ai(async move {
1013        crate::ai::openai_embeddings_async(&transport, request).await
1014    })
1015    .and_then(|result| result)
1016    .ok()?;
1017    response.embeddings.into_iter().next()
1018}
1019
1020// ---------------------------------------------------------------------------
1021// Stage 4 — value filter using literal tokens.
1022// ---------------------------------------------------------------------------
1023
1024/// Walk every candidate collection looking for rows whose columns
1025/// contain any of the literal tokens. Caller-supplied `row_cap`
1026/// bounds the result count; column hints from Stage 2 are visited
1027/// first so promising columns find a match before a full scan.
1028pub fn filter_values(
1029    runtime: &RedDBRuntime,
1030    scope: &EffectiveScope,
1031    candidates: &CandidateCollections,
1032    tokens: &TokenSet,
1033    row_cap: usize,
1034) -> Vec<FilteredRow> {
1035    if tokens.literals.is_empty() || candidates.collections.is_empty() {
1036        return Vec::new();
1037    }
1038    let visible = scope.visible_collections();
1039    let store = runtime.inner.db.store();
1040    let mut out: Vec<FilteredRow> = Vec::new();
1041    let _scope_guard = AskScopeGuard::install(scope);
1042    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1043    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
1044
1045    'collection: for collection in &candidates.collections {
1046        // Defence-in-depth: redo the visibility check here so a Stage
1047        // 2 regression can't smuggle an out-of-scope collection
1048        // through.
1049        if let Some(set) = visible {
1050            if !set.contains(collection) {
1051                continue;
1052            }
1053        }
1054        let Some(manager) = store.get_collection(collection) else {
1055            continue;
1056        };
1057        let hint_columns: &[String] = candidates
1058            .columns_by_collection
1059            .get(collection)
1060            .map(|v| v.as_slice())
1061            .unwrap_or(&[]);
1062
1063        for entity in manager.query_all(|_| true) {
1064            if !ask_entity_allowed(
1065                runtime,
1066                scope,
1067                collection,
1068                &entity,
1069                snap_ctx.as_ref(),
1070                &mut rls_cache,
1071            ) {
1072                continue;
1073            }
1074            if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
1075                out.push(FilteredRow {
1076                    collection: collection.clone(),
1077                    entity,
1078                    matched_literal: hit.0,
1079                    matched_column: hit.1,
1080                });
1081                if out.len() >= row_cap {
1082                    break 'collection;
1083                }
1084            }
1085        }
1086    }
1087    out
1088}
1089
1090fn ask_entity_allowed(
1091    runtime: &RedDBRuntime,
1092    scope: &EffectiveScope,
1093    collection: &str,
1094    entity: &UnifiedEntity,
1095    snap_ctx: Option<&crate::runtime::impl_core::SnapshotContext>,
1096    rls_cache: &mut HashMap<String, Option<crate::storage::query::ast::Filter>>,
1097) -> bool {
1098    if scope
1099        .visible_collections()
1100        .is_some_and(|visible| !visible.contains(collection))
1101    {
1102        return false;
1103    }
1104    runtime.search_entity_allowed(collection, entity, snap_ctx, rls_cache)
1105}
1106
1107struct AskScopeGuard {
1108    prev_tenant: Option<String>,
1109    prev_auth: Option<(String, crate::auth::Role)>,
1110}
1111
1112impl AskScopeGuard {
1113    fn install(scope: &EffectiveScope) -> Self {
1114        let prev_tenant = crate::runtime::impl_core::current_tenant();
1115        let prev_auth = crate::runtime::impl_core::current_auth_identity();
1116
1117        match scope.effective_scope() {
1118            Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant.to_string()),
1119            None => crate::runtime::impl_core::clear_current_tenant(),
1120        }
1121        match scope.identity() {
1122            Some((user, role)) => {
1123                crate::runtime::impl_core::set_current_auth_identity(user.to_string(), role)
1124            }
1125            None => crate::runtime::impl_core::clear_current_auth_identity(),
1126        }
1127
1128        Self {
1129            prev_tenant,
1130            prev_auth,
1131        }
1132    }
1133}
1134
1135impl Drop for AskScopeGuard {
1136    fn drop(&mut self) {
1137        match self.prev_tenant.take() {
1138            Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant),
1139            None => crate::runtime::impl_core::clear_current_tenant(),
1140        }
1141        match self.prev_auth.take() {
1142            Some((user, role)) => crate::runtime::impl_core::set_current_auth_identity(user, role),
1143            None => crate::runtime::impl_core::clear_current_auth_identity(),
1144        }
1145    }
1146}
1147
1148/// Look for any literal in any column value of `entity`. Hint
1149/// columns are checked first; a positive hit short-circuits.
1150fn literal_match_in_entity(
1151    entity: &UnifiedEntity,
1152    literals: &[String],
1153    hint_columns: &[String],
1154) -> Option<(String, Option<String>)> {
1155    let row = match &entity.data {
1156        EntityData::Row(row) => row,
1157        _ => return None,
1158    };
1159
1160    // Pass 1: hint columns first.
1161    for column in hint_columns {
1162        if let Some(value) = row.get_field(column) {
1163            if let Some(lit) = first_literal_in_value(value, literals) {
1164                return Some((lit, Some(column.clone())));
1165            }
1166        }
1167    }
1168    // Pass 2: every other column.
1169    for (name, value) in row.iter_fields() {
1170        if hint_columns.iter().any(|c| c == name) {
1171            continue;
1172        }
1173        if let Some(lit) = first_literal_in_value(value, literals) {
1174            return Some((lit, Some(name.to_string())));
1175        }
1176    }
1177    None
1178}
1179
1180fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
1181    let rendered = match value {
1182        Value::Text(s) => s.to_string(),
1183        Value::Integer(i) => i.to_string(),
1184        Value::Float(f) => f.to_string(),
1185        Value::Boolean(b) => b.to_string(),
1186        Value::Json(j) => String::from_utf8_lossy(j).to_string(),
1187        _ => return None,
1188    };
1189    for lit in literals {
1190        // Case-sensitive substring match: literals are id-shaped, so
1191        // we want `FDD-12313` to find embedded `FDD-12313` in a free-
1192        // form description column too.
1193        if rendered.contains(lit) {
1194            return Some(lit.clone());
1195        }
1196    }
1197    None
1198}
1199
1200#[cfg(test)]
1201mod tests {
1202    use super::*;
1203
1204    // -- Stage 1 ------------------------------------------------------
1205
1206    #[test]
1207    fn extract_tokens_splits_keywords_and_literals() {
1208        let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
1209        // `quais`, `as`, `sobre`, `o` are stop words; `novidades`,
1210        // `passport` survive.
1211        assert!(tokens.keywords.contains(&"novidades".to_string()));
1212        assert!(tokens.keywords.contains(&"passport".to_string()));
1213        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1214        assert!(!tokens.is_empty());
1215    }
1216
1217    #[test]
1218    fn extract_tokens_returns_empty_for_punctuation_only() {
1219        let tokens = extract_tokens("???   ...");
1220        assert!(tokens.is_empty());
1221    }
1222
1223    #[test]
1224    fn extract_tokens_long_digit_run_is_a_literal() {
1225        let tokens = extract_tokens("show order 987654321 details");
1226        assert!(tokens.literals.contains(&"987654321".to_string()));
1227        assert!(tokens.keywords.contains(&"order".to_string()));
1228        assert!(tokens.keywords.contains(&"details".to_string()));
1229        assert!(tokens.keywords.contains(&"show".to_string()));
1230    }
1231
1232    #[test]
1233    fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
1234        // "USA" is uppercase but lacks a digit, so it stays a keyword
1235        // (lowercased) — Stage 2 still gets to probe it.
1236        let tokens = extract_tokens("USA exports report");
1237        assert!(tokens.keywords.contains(&"usa".to_string()));
1238        assert!(tokens.literals.is_empty());
1239    }
1240
1241    #[test]
1242    fn extract_tokens_dedups() {
1243        let tokens = extract_tokens("passport passport FDD-1 FDD-1");
1244        assert_eq!(
1245            tokens.keywords.iter().filter(|k| *k == "passport").count(),
1246            1
1247        );
1248        assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
1249    }
1250
1251    // -- Stage 4 helper ----------------------------------------------
1252
1253    #[test]
1254    fn first_literal_in_value_substring_match() {
1255        let lit = first_literal_in_value(
1256            &Value::text("issue FDD-12313 reported by user"),
1257            &["FDD-12313".to_string()],
1258        );
1259        assert_eq!(lit.as_deref(), Some("FDD-12313"));
1260    }
1261
1262    #[test]
1263    fn first_literal_in_value_no_match_returns_none() {
1264        assert!(
1265            first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
1266                .is_none()
1267        );
1268    }
1269
1270    // -- Pipeline-wide -----------------------------------------------
1271
1272    use crate::api::RedDBOptions;
1273    use crate::auth::Role;
1274    use crate::runtime::statement_frame::EffectiveScope;
1275    use crate::runtime::RedDBRuntime;
1276    use crate::storage::schema::Value;
1277    use crate::storage::transaction::snapshot::Snapshot;
1278    use crate::storage::unified::entity::{
1279        EntityData, EntityId, EntityKind, RowData, UnifiedEntity,
1280    };
1281    use std::sync::Arc;
1282
1283    fn make_scope(visible: HashSet<String>) -> EffectiveScope {
1284        EffectiveScope {
1285            tenant: Some("acme".to_string()),
1286            identity: Some(("alice".to_string(), Role::Read)),
1287            snapshot: Snapshot {
1288                xid: 0,
1289                in_progress: HashSet::new(),
1290            },
1291            visible_collections: Some(visible),
1292        }
1293    }
1294
1295    fn fresh_runtime() -> RedDBRuntime {
1296        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
1297    }
1298
1299    fn test_row(collection: &str, id: u64) -> FilteredRow {
1300        FilteredRow {
1301            collection: collection.to_string(),
1302            entity: UnifiedEntity::new(
1303                EntityId::new(id),
1304                EntityKind::TableRow {
1305                    table: Arc::from(collection),
1306                    row_id: id,
1307                },
1308                EntityData::Row(RowData {
1309                    columns: Vec::new(),
1310                    named: Some(
1311                        [("body".to_string(), Value::text("ticket FDD-1".to_string()))]
1312                            .into_iter()
1313                            .collect(),
1314                    ),
1315                    schema: None,
1316                }),
1317            ),
1318            matched_literal: "FDD-1".to_string(),
1319            matched_column: Some("body".to_string()),
1320        }
1321    }
1322
1323    fn test_graph_hit(collection: &str, id: u64, score: f32, depth: usize) -> GraphHit {
1324        GraphHit {
1325            collection: collection.to_string(),
1326            entity_id: id,
1327            score,
1328            depth,
1329            kind: GraphHitKind::Node,
1330        }
1331    }
1332
1333    fn test_text_hit(collection: &str, id: u64, score: f32) -> TextHit {
1334        TextHit {
1335            collection: collection.to_string(),
1336            entity_id: id,
1337            score,
1338        }
1339    }
1340
1341    struct TenantGuard;
1342
1343    impl TenantGuard {
1344        fn set(tenant: &str) -> Self {
1345            crate::runtime::impl_core::set_current_tenant(tenant.to_string());
1346            Self
1347        }
1348    }
1349
1350    impl Drop for TenantGuard {
1351        fn drop(&mut self) {
1352            crate::runtime::impl_core::clear_current_tenant();
1353        }
1354    }
1355
1356    fn row_text<'a>(entity: &'a UnifiedEntity, field: &str) -> Option<&'a str> {
1357        let row = entity.data.as_row()?;
1358        match row.get_field(field)? {
1359            Value::Text(value) => Some(value.as_ref()),
1360            _ => None,
1361        }
1362    }
1363
1364    #[test]
1365    fn fused_source_order_uses_rrf_and_total_limit() {
1366        let ctx = AskContext {
1367            source_limit: 2,
1368            filtered_rows: vec![test_row("incidents", 2), test_row("incidents", 1)],
1369            vector_hits: vec![
1370                VectorHit {
1371                    collection: "incidents".to_string(),
1372                    entity_id: 1,
1373                    score: 0.91,
1374                },
1375                VectorHit {
1376                    collection: "docs".to_string(),
1377                    entity_id: 9,
1378                    score: 0.88,
1379                },
1380            ],
1381            ..AskContext::default()
1382        };
1383
1384        let order = fused_source_order(&ctx);
1385
1386        assert_eq!(
1387            order,
1388            vec![
1389                FusedSourceRef::FilteredRow(1),
1390                FusedSourceRef::FilteredRow(0)
1391            ]
1392        );
1393    }
1394
1395    #[test]
1396    fn fused_source_order_includes_graph_bucket() {
1397        let ctx = AskContext {
1398            source_limit: 4,
1399            filtered_rows: vec![test_row("incidents", 1)],
1400            text_hits: vec![test_text_hit("articles", 5, 1.2)],
1401            vector_hits: vec![
1402                VectorHit {
1403                    collection: "incidents".to_string(),
1404                    entity_id: 1,
1405                    score: 0.91,
1406                },
1407                VectorHit {
1408                    collection: "docs".to_string(),
1409                    entity_id: 9,
1410                    score: 0.88,
1411                },
1412            ],
1413            graph_hits: vec![test_graph_hit("topology", 7, 0.80, 1)],
1414            ..AskContext::default()
1415        };
1416
1417        let order = fused_source_order(&ctx);
1418
1419        assert_eq!(
1420            order,
1421            vec![
1422                FusedSourceRef::FilteredRow(0),
1423                FusedSourceRef::TextHit(0),
1424                FusedSourceRef::GraphHit(0),
1425                FusedSourceRef::VectorHit(1),
1426            ]
1427        );
1428    }
1429
1430    #[test]
1431    fn text_search_bm25_scoped_ranks_specific_document_first() {
1432        let rt = fresh_runtime();
1433        rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1434            .expect("create docs");
1435        rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1436            .expect("insert specific doc");
1437        rt.execute_query(
1438            "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1439        )
1440        .expect("insert broad doc");
1441
1442        let scope = make_scope(["docs".to_string()].into_iter().collect());
1443        let candidates = CandidateCollections {
1444            collections: vec!["docs".to_string()],
1445            columns_by_collection: HashMap::new(),
1446        };
1447        let hits = text_search_bm25_scoped(&rt, &scope, "passport renewal", &candidates, 10);
1448
1449        assert_eq!(hits.len(), 2);
1450        assert!(
1451            hits[0].score > hits[1].score,
1452            "BM25 text bucket should prefer the shorter exact match: {hits:?}"
1453        );
1454    }
1455
1456    #[test]
1457    fn text_search_bm25_scoped_filters_rls_denied_hits() {
1458        let rt = fresh_runtime();
1459        rt.execute_query(
1460            "CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT) WITH CONTEXT INDEX ON (body)",
1461        )
1462        .expect("create docs");
1463        rt.execute_query(
1464            "INSERT INTO docs (id, tenant_id, body) VALUES \
1465             (1, 'acme', 'shared launch plan'), \
1466             (2, 'globex', 'shared launch plan')",
1467        )
1468        .expect("seed docs");
1469        rt.execute_query(
1470            "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1471        )
1472        .expect("create policy");
1473        rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1474            .expect("enable rls");
1475
1476        let _tenant = TenantGuard::set("acme");
1477        let scope = make_scope(["docs".to_string()].into_iter().collect());
1478        let candidates = CandidateCollections {
1479            collections: vec!["docs".to_string()],
1480            columns_by_collection: HashMap::new(),
1481        };
1482
1483        let hits = text_search_bm25_scoped(&rt, &scope, "shared launch", &candidates, 10);
1484
1485        assert_eq!(hits.len(), 1, "RLS should hide the globex hit: {hits:?}");
1486        let entity = rt
1487            .inner
1488            .db
1489            .store()
1490            .get(
1491                "docs",
1492                crate::storage::unified::entity::EntityId::new(hits[0].entity_id),
1493            )
1494            .expect("hit entity exists");
1495        assert_eq!(row_text(&entity, "tenant_id"), Some("acme"));
1496    }
1497
1498    #[test]
1499    fn execute_pipeline_retrieves_known_good_bm25_source_order() {
1500        let rt = fresh_runtime();
1501        rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1502            .expect("create docs");
1503        rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1504            .expect("insert specific doc");
1505        rt.execute_query(
1506            "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1507        )
1508        .expect("insert broad doc");
1509        rt.schema_vocabulary_apply(
1510            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1511                collection: "docs".to_string(),
1512                columns: vec!["body".into()],
1513                type_tags: Vec::new(),
1514                description: None,
1515            },
1516        );
1517
1518        let scope = make_scope(["docs".to_string()].into_iter().collect());
1519        let ctx = AskPipeline::execute_with_limit_and_min_score(
1520            &rt,
1521            &scope,
1522            "body passport renewal",
1523            2,
1524            None,
1525            Some(1),
1526        )
1527        .expect("pipeline executes");
1528
1529        assert_eq!(ctx.text_hits.len(), 2);
1530        assert!(
1531            ctx.text_hits[0].score > ctx.text_hits[1].score,
1532            "BM25 source order should prefer the shorter exact match: {:?}",
1533            ctx.text_hits
1534        );
1535        assert!(matches!(
1536            fused_source_order(&ctx).first(),
1537            Some(FusedSourceRef::TextHit(0))
1538        ));
1539    }
1540
1541    #[test]
1542    fn graph_search_scoped_honors_depth() {
1543        let rt = fresh_runtime();
1544        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('alice', 'Alice')")
1545            .expect("insert alice");
1546        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('bob', 'Bob')")
1547            .expect("insert bob");
1548        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('carol', 'Carol')")
1549            .expect("insert carol");
1550        rt.execute_query(
1551            "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'alice', 'bob')",
1552        )
1553        .expect("insert alice-bob edge");
1554        rt.execute_query(
1555            "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'bob', 'carol')",
1556        )
1557        .expect("insert bob-carol edge");
1558
1559        let scope = make_scope(["tales".to_string()].into_iter().collect());
1560        let candidates = CandidateCollections {
1561            collections: vec!["tales".to_string()],
1562            columns_by_collection: HashMap::new(),
1563        };
1564        let depth1 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(1));
1565        let depth2 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(2));
1566
1567        assert!(
1568            depth1.iter().all(|hit| hit.depth <= 1),
1569            "DEPTH 1 returned hits beyond one hop: {depth1:?}"
1570        );
1571        assert!(
1572            depth2.iter().any(|hit| hit.depth == 2),
1573            "DEPTH 2 should include the second-hop graph hit: {depth2:?}"
1574        );
1575    }
1576
1577    #[test]
1578    fn filter_values_filters_rls_denied_rows() {
1579        let rt = fresh_runtime();
1580        rt.execute_query("CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT)")
1581            .expect("create docs");
1582        rt.execute_query(
1583            "INSERT INTO docs (id, tenant_id, body) VALUES \
1584             (1, 'acme', 'incident FDD-12313'), \
1585             (2, 'globex', 'incident FDD-12313')",
1586        )
1587        .expect("seed docs");
1588        rt.execute_query(
1589            "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1590        )
1591        .expect("create policy");
1592        rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1593            .expect("enable rls");
1594
1595        let _tenant = TenantGuard::set("acme");
1596        let scope = make_scope(["docs".to_string()].into_iter().collect());
1597        let candidates = CandidateCollections {
1598            collections: vec!["docs".to_string()],
1599            columns_by_collection: HashMap::from([("docs".to_string(), vec!["body".to_string()])]),
1600        };
1601        let tokens = TokenSet {
1602            keywords: vec!["incident".to_string()],
1603            literals: vec!["FDD-12313".to_string()],
1604        };
1605
1606        let rows = filter_values(&rt, &scope, &candidates, &tokens, 10);
1607
1608        assert_eq!(rows.len(), 1, "RLS should hide the globex row: {rows:?}");
1609        assert_eq!(row_text(&rows[0].entity, "tenant_id"), Some("acme"));
1610    }
1611
1612    /// Empty token sets short-circuit with a structured error before
1613    /// any LLM round-trip.
1614    #[test]
1615    fn execute_refuses_empty_token_set() {
1616        let rt = fresh_runtime();
1617        let scope = make_scope(HashSet::new());
1618        let err = AskPipeline::execute(&rt, &scope, "??? ...")
1619            .expect_err("empty token set must short-circuit");
1620        let msg = format!("{err}");
1621        assert!(
1622            msg.contains("yielded no usable tokens"),
1623            "expected structured empty-token error, got: {msg}"
1624        );
1625    }
1626
1627    /// match_schema drops every collection that's outside
1628    /// `scope.visible_collections`. Using a synthetic vocab via DDL
1629    /// events on the live runtime so the assertion drives real
1630    /// `RedDBRuntime::schema_vocabulary_lookup`.
1631    #[test]
1632    fn match_schema_intersects_with_visible_set() {
1633        let rt = fresh_runtime();
1634        // Two collections both carry a `passport` column. Caller's
1635        // scope only includes `travel`, so the `passport` column hit
1636        // on `secrets` must be dropped.
1637        rt.schema_vocabulary_apply(
1638            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1639                collection: "travel".to_string(),
1640                columns: vec!["id".into(), "passport".into()],
1641                type_tags: Vec::new(),
1642                description: None,
1643            },
1644        );
1645        rt.schema_vocabulary_apply(
1646            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1647                collection: "secrets".to_string(),
1648                columns: vec!["passport".into()],
1649                type_tags: Vec::new(),
1650                description: None,
1651            },
1652        );
1653        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1654        let scope = make_scope(visible.clone());
1655        let tokens = TokenSet {
1656            keywords: vec!["passport".to_string()],
1657            literals: Vec::new(),
1658        };
1659        let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
1660        assert_eq!(candidates.collections, vec!["travel".to_string()]);
1661        assert!(!candidates.collections.contains(&"secrets".to_string()));
1662        // Column hint surfaces for the surviving collection.
1663        let cols = candidates
1664            .columns_by_collection
1665            .get("travel")
1666            .expect("hint columns");
1667        assert!(cols.contains(&"passport".to_string()));
1668    }
1669
1670    // -- Property test (issue #121 acceptance row) -------------------
1671    //
1672    // For 256 random (question, scope) pairs: every Stage 4 row's
1673    // collection MUST be inside `scope.visible_collections`. Drives
1674    // `filter_values` directly with synthetic candidate sets so the
1675    // invariant is pinned without an embedding API.
1676
1677    use proptest::prelude::*;
1678
1679    fn arb_collection() -> impl Strategy<Value = String> {
1680        "[a-z]{1,4}"
1681    }
1682
1683    fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
1684        prop::collection::hash_set(arb_collection(), 0..6)
1685    }
1686
1687    fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
1688        prop::collection::vec(arb_collection(), 0..8)
1689    }
1690
1691    proptest! {
1692        #![proptest_config(ProptestConfig::with_cases(256))]
1693        #[test]
1694        fn stage4_rows_subset_of_visible_collections(
1695            visible in arb_visible(),
1696            candidate_names in arb_candidates(),
1697            literal_count in 0usize..3,
1698        ) {
1699            // Single runtime shared across cases — `filter_values`
1700            // only reads (no mutation), and we need the empty-store
1701            // path so the invariant we want to pin is "no row escapes
1702            // visible_collections" rather than "any specific row
1703            // surfaces".
1704            let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
1705            let candidates = CandidateCollections {
1706                collections: candidate_names,
1707                columns_by_collection: HashMap::new(),
1708            };
1709            let literals: Vec<String> = (0..literal_count)
1710                .map(|i| format!("ID-{i}"))
1711                .collect();
1712            let tokens = TokenSet {
1713                keywords: vec!["passport".to_string()],
1714                literals,
1715            };
1716            let scope = make_scope(visible.clone());
1717            let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
1718            for row in &rows {
1719                prop_assert!(
1720                    visible.contains(&row.collection),
1721                    "Stage 4 leaked row collection={} not in visible={:?}",
1722                    row.collection, visible
1723                );
1724            }
1725        }
1726    }
1727
1728    static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
1729
1730    // -- Integration test (issue #121 acceptance row) ----------------
1731    //
1732    // Drives the four stages end-to-end through `AskPipeline::execute`
1733    // with the question the issue calls out:
1734    //   "quais as novidades sobre o passport FDD-12313?"
1735    //
1736    // Stage 1 must extract `passport` + `FDD-12313`. Stage 2 must
1737    // narrow to the `passports` collection (visible to the caller).
1738    // Stage 3 silently yields zero hits without an embedding provider
1739    // — that's expected for the test fixture path. Stage 4 must surface
1740    // the row whose `notes` column embeds `FDD-12313`.
1741
1742    #[test]
1743    fn integration_passport_fdd_12313_funnels_through_four_stages() {
1744        let rt = fresh_runtime();
1745        // The collection name itself + a `passport` column both feed
1746        // Stage 2's vocabulary; either one is enough for the question
1747        // "passport FDD-12313" to land here.
1748        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1749            .expect("CREATE TABLE travel");
1750        rt.execute_query(
1751            "INSERT INTO travel (id, passport, notes) VALUES \
1752             (1, 'BR-001', 'unrelated note'), \
1753             (2, 'PT-002', 'incident FDD-12313 escalated'), \
1754             (3, 'US-003', 'standard renewal')",
1755        )
1756        .expect("seed rows");
1757        // Out-of-scope collection — must NEVER surface in Stage 4.
1758        rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
1759            .expect("CREATE TABLE secrets");
1760        rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
1761            .expect("seed secrets");
1762
1763        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1764        let scope = make_scope(visible);
1765
1766        let ctx = AskPipeline::execute(
1767            &rt,
1768            &scope,
1769            "quais as novidades sobre o passport FDD-12313?",
1770        )
1771        .expect("pipeline runs");
1772
1773        // Stage 1: passport + FDD-12313 surfaced.
1774        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1775        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1776
1777        // Stage 2: candidates narrowed to `travel` (the `passport`
1778        // column on `secrets` is dropped by the visible-set
1779        // intersection).
1780        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1781
1782        // Stage 3: best-effort embedding — without a provider
1783        // configured, Stage 3 silently returns []; the rest of the
1784        // funnel still runs.
1785        let _ = &ctx.vector_hits;
1786
1787        // Stage 4: the row whose `notes` mentions `FDD-12313`
1788        // surfaces; the out-of-scope `secrets` row does NOT.
1789        assert!(
1790            ctx.filtered_rows
1791                .iter()
1792                .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1793            "expected travel row with FDD-12313 match, got: {:?}",
1794            ctx.filtered_rows
1795        );
1796        for row in &ctx.filtered_rows {
1797            assert_ne!(
1798                row.collection, "secrets",
1799                "secrets row leaked into Stage 4 output"
1800            );
1801        }
1802
1803        // Per-stage timing recorded.
1804        // (The Instant-based measurements may be 0 on very fast hosts;
1805        // we only check the field exists and was populated.)
1806        let _ = ctx.timings.extract_us
1807            + ctx.timings.schema_us
1808            + ctx.timings.vector_us
1809            + ctx.timings.filter_us;
1810    }
1811
1812    // -- Stage 1 routing (Lane 4/5: LlmNer wiring) -------------------
1813    //
1814    // The routing dispatcher is an `extract_tokens_routed` helper that
1815    // reads `ai.ner.backend` and either passes through to the heuristic
1816    // (default) or routes through `LlmNer::extract`. Today the
1817    // capability gate (`EffectiveScope::has_capability`) is a placeholder
1818    // that always returns `false`, so the LLM path always denies and
1819    // the configured `HeuristicFallback` policy fires. The tests below
1820    // pin every observable: heuristic stays the default, `llm + auth
1821    // denied` honours each fallback mode, and the one-shot info log is
1822    // best-effort (we don't assert it directly to avoid a coupling to
1823    // the global `tracing` subscriber).
1824
1825    fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1826        let store = rt.inner.db.store();
1827        store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1828    }
1829
1830    /// Default backend stays heuristic — even with an open scope, the
1831    /// pipeline returns the same tokens it would without any config.
1832    #[test]
1833    fn routed_default_backend_runs_heuristic() {
1834        let rt = fresh_runtime();
1835        let scope = make_scope(HashSet::new());
1836        let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1837            .expect("heuristic path is infallible");
1838        assert!(tokens.keywords.contains(&"passport".to_string()));
1839        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1840    }
1841
1842    /// `backend = llm` with `fallback = use_heuristic`: capability
1843    /// denies (placeholder) → fallback → heuristic tokens surface.
1844    #[tokio::test(flavor = "multi_thread")]
1845    async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1846        let rt = fresh_runtime();
1847        write_config(&rt, "ai.ner.backend", "llm");
1848        write_config(&rt, "ai.ner.fallback", "use_heuristic");
1849        let scope = make_scope(HashSet::new());
1850        let tokens = tokio::task::spawn_blocking(move || {
1851            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1852        })
1853        .await
1854        .unwrap()
1855        .expect("fallback policy keeps the call OK");
1856        assert!(tokens.keywords.contains(&"passport".to_string()));
1857        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1858    }
1859
1860    /// `backend = llm` with `fallback = empty_on_fail`: auth denies →
1861    /// fallback returns an empty `TokenSet`.
1862    #[tokio::test(flavor = "multi_thread")]
1863    async fn routed_llm_auth_denied_empty_on_fail() {
1864        let rt = fresh_runtime();
1865        write_config(&rt, "ai.ner.backend", "llm");
1866        write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1867        let scope = make_scope(HashSet::new());
1868        let tokens = tokio::task::spawn_blocking(move || {
1869            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1870        })
1871        .await
1872        .unwrap()
1873        .expect("empty_on_fail returns Ok with empty TokenSet");
1874        assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1875    }
1876
1877    /// `backend = llm` with `fallback = propagate`: auth denies →
1878    /// `extract_tokens_routed` surfaces a `RedDBError::Query` so the
1879    /// caller can decide.
1880    #[tokio::test(flavor = "multi_thread")]
1881    async fn routed_llm_auth_denied_propagate_returns_error() {
1882        let rt = fresh_runtime();
1883        write_config(&rt, "ai.ner.backend", "llm");
1884        write_config(&rt, "ai.ner.fallback", "propagate");
1885        let scope = make_scope(HashSet::new());
1886        let err = tokio::task::spawn_blocking(move || {
1887            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1888        })
1889        .await
1890        .unwrap()
1891        .expect_err("propagate must surface the error");
1892        let msg = format!("{err}");
1893        assert!(
1894            msg.contains("propagate") || msg.contains("ai.ner.backend"),
1895            "expected propagate error message, got: {msg}"
1896        );
1897    }
1898
1899    /// AskPipeline end-to-end with `backend = llm` and the default
1900    /// `use_heuristic` fallback: the pipeline still returns tokens (via
1901    /// fallback), Stage 1 is the only routed stage, and the rest of the
1902    /// funnel runs unchanged.
1903    #[tokio::test(flavor = "multi_thread")]
1904    async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1905        let rt = fresh_runtime();
1906        write_config(&rt, "ai.ner.backend", "llm");
1907        // default fallback is use_heuristic — leave it implicit.
1908        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1909            .expect("CREATE TABLE travel");
1910        rt.execute_query(
1911            "INSERT INTO travel (id, passport, notes) VALUES \
1912             (2, 'PT-002', 'incident FDD-12313 escalated')",
1913        )
1914        .expect("seed rows");
1915        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1916        let scope = make_scope(visible);
1917        let ctx = tokio::task::spawn_blocking(move || {
1918            AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1919        })
1920        .await
1921        .unwrap()
1922        .expect("pipeline runs");
1923        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1924        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1925        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1926        assert!(
1927            ctx.filtered_rows
1928                .iter()
1929                .any(|r| r.matched_literal == "FDD-12313"),
1930            "Stage 4 still runs after Stage 1 fallback"
1931        );
1932    }
1933}