Skip to main content

reddb_server/runtime/
ask_pipeline.rs

1//! AskPipeline — issue #121.
2//!
3//! 4-stage funnel that turns a natural-language ASK question into a
4//! scoped, filtered candidate set the LLM call can synthesise an
5//! answer over. Each stage is a pure function so the pipeline reads
6//! top-to-bottom in `execute` and individual stages can be unit-tested
7//! in isolation:
8//!
9//! 1. **`extract_tokens`** — heuristic NER. Splits the question into
10//!    `keywords` (alphanumeric words) and `literals` (uppercase ID-like
11//!    tokens, e.g. `FDD-12313`). LLM-NER is reserved for slice #123.
12//! 2. **`match_schema`** — `SchemaVocabulary::lookup` per keyword;
13//!    the resulting collection set is intersected with the caller's
14//!    `EffectiveScope.visible_collections` so out-of-scope tables
15//!    never enter the candidate pool. Issue #119 pre-filter.
16//! 3. **`vector_search_scoped`** — best-effort embedding + similarity
17//!    over the candidate collections via `AuthorizedSearch`. When no
18//!    embedding API is available (most unit-test fixtures), the stage
19//!    short-circuits with an empty match list — Stage 4 still runs.
20//! 4. **`filter_values`** — applies literal tokens as exact filters
21//!    over candidate-collection columns, returning the rows that
22//!    actually mention each literal.
23//!
24//! Output: typed [`AskContext`] holding all four stage outputs plus
25//! per-stage timing. Empty token-set short-circuits with a structured
26//! error so callers don't pay for an LLM round-trip on a query that
27//! contained nothing addressable.
28//!
29//! The legacy `RedDBRuntime::search_context` is now a Stage-2-internal
30//! helper for the broad-recall fallback; ASK no longer reaches for it
31//! directly.
32
33use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40    AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::application::SearchContextInput;
46use crate::storage::schema::Value;
47use crate::storage::unified::entity::{EntityData, EntityKind, UnifiedEntity};
48
49/// Default cap for Stage 4 row output. Override per-call via
50/// [`AskPipeline::execute_with_limit`].
51pub const DEFAULT_ROW_CAP: usize = 20;
52
53/// Token bag produced by Stage 1.
54#[derive(Debug, Clone, Default, PartialEq, Eq)]
55pub struct TokenSet {
56    /// Lowercase keyword tokens (regex `[A-Za-z][A-Za-z0-9_]+`,
57    /// length ≥ 2). Used by Stage 2 to probe `SchemaVocabulary`.
58    pub keywords: Vec<String>,
59    /// Literal-id tokens kept in their original case so Stage 4 can
60    /// run case-sensitive equality / substring filters. Matches
61    /// `[A-Z0-9-]{3,}` containing at least one digit OR `[0-9]{6,}`.
62    pub literals: Vec<String>,
63}
64
65impl TokenSet {
66    pub fn is_empty(&self) -> bool {
67        self.keywords.is_empty() && self.literals.is_empty()
68    }
69}
70
71/// Stage 2 output: collections likely to contain the answer.
72#[derive(Debug, Clone, Default)]
73pub struct CandidateCollections {
74    /// Collection names, sorted, deduplicated, intersected with the
75    /// caller's `EffectiveScope.visible_collections`.
76    pub collections: Vec<String>,
77    /// Columns hinted by `SchemaVocabulary` for each candidate
78    /// collection. Used by Stage 4 to scope the literal filter to
79    /// promising columns first.
80    pub columns_by_collection: HashMap<String, Vec<String>>,
81}
82
83/// One Stage 3 BM25 text hit from the context index.
84#[derive(Debug, Clone)]
85pub struct TextHit {
86    pub collection: String,
87    pub entity_id: u64,
88    pub score: f32,
89}
90
91/// One Stage 3 vector hit, kept thin so the pipeline doesn't pull
92/// the full `ScoredMatch` shape from `dsl::QueryResult` through.
93#[derive(Debug, Clone)]
94pub struct VectorHit {
95    pub collection: String,
96    pub entity_id: u64,
97    pub score: f32,
98}
99
100/// One graph traversal hit for the ASK graph retrieval bucket.
101#[derive(Debug, Clone)]
102pub struct GraphHit {
103    pub collection: String,
104    pub entity_id: u64,
105    pub score: f32,
106    pub depth: usize,
107    pub kind: GraphHitKind,
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq)]
111pub enum GraphHitKind {
112    Node,
113    Edge,
114}
115
116/// Stage 4 output: rows that match a literal filter.
117#[derive(Debug, Clone)]
118pub struct FilteredRow {
119    pub collection: String,
120    pub entity: UnifiedEntity,
121    /// Literal token that matched this row.
122    pub matched_literal: String,
123    /// Column where the match was found, when known.
124    pub matched_column: Option<String>,
125}
126
127/// Per-stage timing in microseconds. Logged via tracing on every run.
128#[derive(Debug, Clone, Default, PartialEq, Eq)]
129pub struct StageTimings {
130    pub extract_us: u64,
131    pub schema_us: u64,
132    pub text_us: u64,
133    pub vector_us: u64,
134    pub graph_us: u64,
135    pub filter_us: u64,
136}
137
138/// Typed context handed back to the ASK caller. Carries all four
139/// stage outputs so the LLM-formatting helper (slice #122) can pick
140/// what it needs without re-running the funnel.
141#[derive(Debug, Clone)]
142pub struct AskContext {
143    pub question: String,
144    pub tokens: TokenSet,
145    pub candidates: CandidateCollections,
146    pub text_hits: Vec<TextHit>,
147    pub vector_hits: Vec<VectorHit>,
148    pub graph_hits: Vec<GraphHit>,
149    pub filtered_rows: Vec<FilteredRow>,
150    pub source_limit: usize,
151    pub timings: StageTimings,
152}
153
154impl Default for AskContext {
155    fn default() -> Self {
156        Self {
157            question: String::new(),
158            tokens: TokenSet::default(),
159            candidates: CandidateCollections::default(),
160            text_hits: Vec::new(),
161            vector_hits: Vec::new(),
162            graph_hits: Vec::new(),
163            filtered_rows: Vec::new(),
164            source_limit: DEFAULT_ROW_CAP,
165            timings: StageTimings::default(),
166        }
167    }
168}
169
170/// One fused source reference in the final ASK context order.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum FusedSourceRef {
173    FilteredRow(usize),
174    TextHit(usize),
175    VectorHit(usize),
176    GraphHit(usize),
177}
178
179/// One fused source reference plus its RRF score.
180#[derive(Debug, Clone, Copy, PartialEq)]
181pub struct FusedSource {
182    pub source: FusedSourceRef,
183    pub rrf_score: f64,
184}
185
186/// Pipeline entry point. Instances are stateless — kept as an empty
187/// enum so callers spell `AskPipeline::execute(...)` (matches the
188/// shape #121 calls out).
189pub enum AskPipeline {}
190
191impl AskPipeline {
192    /// Run all four stages with the default row cap.
193    pub fn execute(
194        runtime: &RedDBRuntime,
195        scope: &EffectiveScope,
196        question: &str,
197    ) -> RedDBResult<AskContext> {
198        Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
199    }
200
201    /// Run all four stages with a configurable Stage 4 row cap.
202    pub fn execute_with_limit(
203        runtime: &RedDBRuntime,
204        scope: &EffectiveScope,
205        question: &str,
206        row_cap: usize,
207    ) -> RedDBResult<AskContext> {
208        Self::execute_with_limit_and_min_score(runtime, scope, question, row_cap, None, None)
209    }
210
211    /// Run all four stages with a configurable row cap and per-bucket
212    /// minimum score for retrieval stages that expose native scores.
213    pub fn execute_with_limit_and_min_score(
214        runtime: &RedDBRuntime,
215        scope: &EffectiveScope,
216        question: &str,
217        row_cap: usize,
218        min_score: Option<f32>,
219        graph_depth: Option<usize>,
220    ) -> RedDBResult<AskContext> {
221        let span = info_span!(
222            "ask_pipeline.execute",
223            tenant = ?scope.effective_scope(),
224            question_len = question.len(),
225            row_cap = row_cap,
226            min_score = ?min_score,
227            graph_depth = ?graph_depth,
228        );
229        let _enter = span.enter();
230
231        // Stage 1. Routed through `extract_tokens_routed` so the
232        // `ai.ner.backend` config knob can swap the heuristic for the
233        // opt-in LLM NER without converting the surrounding pipeline
234        // to async (see `extract_tokens_routed` for the rationale).
235        let stage1 = Instant::now();
236        let tokens = extract_tokens_routed(runtime, scope, question)?;
237        let extract_us = stage1.elapsed().as_micros() as u64;
238        debug!(
239            target: "ask_pipeline",
240            stage = "extract_tokens",
241            keywords = ?tokens.keywords,
242            literals = ?tokens.literals,
243            elapsed_us = extract_us,
244            "stage 1 done"
245        );
246        if tokens.is_empty() {
247            warn!(
248                target: "ask_pipeline",
249                question_len = question.len(),
250                "refused: empty token set"
251            );
252            return Err(RedDBError::Query(
253                "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
254                    .to_string(),
255            ));
256        }
257
258        // Stage 2.
259        let stage2 = Instant::now();
260        let candidates = match_schema(runtime, scope, &tokens)?;
261        let schema_us = stage2.elapsed().as_micros() as u64;
262        debug!(
263            target: "ask_pipeline",
264            stage = "match_schema",
265            collections = ?candidates.collections,
266            elapsed_us = schema_us,
267            "stage 2 done"
268        );
269
270        // Stage 3.
271        let stage3 = Instant::now();
272        let text_hits = text_search_bm25_scoped(runtime, scope, question, &candidates, row_cap);
273        let text_us = stage3.elapsed().as_micros() as u64;
274        debug!(
275            target: "ask_pipeline",
276            stage = "text_search_bm25_scoped",
277            hits = text_hits.len(),
278            elapsed_us = text_us,
279            "stage 3 done"
280        );
281
282        // Stage 3b.
283        let stage3b = Instant::now();
284        let vector_hits =
285            vector_search_scoped(runtime, scope, question, &candidates, row_cap, min_score);
286        let vector_us = stage3b.elapsed().as_micros() as u64;
287        debug!(
288            target: "ask_pipeline",
289            stage = "vector_search_scoped",
290            hits = vector_hits.len(),
291            elapsed_us = vector_us,
292            "stage 3b done"
293        );
294
295        // Stage 3c.
296        let stage3c = Instant::now();
297        let graph_hits = graph_search_scoped(
298            runtime,
299            scope,
300            question,
301            &candidates,
302            row_cap,
303            min_score,
304            graph_depth,
305        );
306        let graph_us = stage3c.elapsed().as_micros() as u64;
307        debug!(
308            target: "ask_pipeline",
309            stage = "graph_search_scoped",
310            hits = graph_hits.len(),
311            elapsed_us = graph_us,
312            "stage 3c done"
313        );
314
315        // Stage 4.
316        let stage4 = Instant::now();
317        let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
318        let filter_us = stage4.elapsed().as_micros() as u64;
319        debug!(
320            target: "ask_pipeline",
321            stage = "filter_values",
322            rows = filtered_rows.len(),
323            elapsed_us = filter_us,
324            "stage 4 done"
325        );
326
327        Ok(AskContext {
328            question: question.to_string(),
329            tokens,
330            candidates,
331            text_hits,
332            vector_hits,
333            graph_hits,
334            filtered_rows,
335            source_limit: row_cap,
336            timings: StageTimings {
337                extract_us,
338                schema_us,
339                text_us,
340                vector_us,
341                graph_us,
342                filter_us,
343            },
344        })
345    }
346}
347
348/// Fuse row-filter and vector buckets into the single ranked source
349/// order used by prompt rendering and `sources_flat`.
350pub fn fused_source_order(ctx: &AskContext) -> Vec<FusedSourceRef> {
351    fused_sources(ctx)
352        .into_iter()
353        .map(|fused| fused.source)
354        .collect()
355}
356
357/// Fuse row-filter and vector buckets into the single ranked source
358/// order with the RRF score preserved for `EXPLAIN ASK`.
359pub fn fused_sources(ctx: &AskContext) -> Vec<FusedSource> {
360    use super::ai::rrf_fuser::{fuse, Bucket, Candidate, RRF_K_DEFAULT};
361
362    if ctx.source_limit == 0
363        || (ctx.filtered_rows.is_empty()
364            && ctx.text_hits.is_empty()
365            && ctx.vector_hits.is_empty()
366            && ctx.graph_hits.is_empty())
367    {
368        return Vec::new();
369    }
370
371    let mut refs: HashMap<String, FusedSourceRef> = HashMap::new();
372    let row_bucket = Bucket {
373        candidates: ctx
374            .filtered_rows
375            .iter()
376            .enumerate()
377            .map(|(idx, row)| {
378                let id = source_identity(&row.collection, row.entity.id.raw());
379                refs.entry(id.clone())
380                    .or_insert(FusedSourceRef::FilteredRow(idx));
381                Candidate { id, score: 1.0 }
382            })
383            .collect(),
384        min_score: None,
385    };
386    let text_bucket = Bucket {
387        candidates: ctx
388            .text_hits
389            .iter()
390            .enumerate()
391            .map(|(idx, hit)| {
392                let id = source_identity(&hit.collection, hit.entity_id);
393                refs.entry(id.clone())
394                    .or_insert(FusedSourceRef::TextHit(idx));
395                Candidate {
396                    id,
397                    score: hit.score as f64,
398                }
399            })
400            .collect(),
401        min_score: None,
402    };
403    let vector_bucket = Bucket {
404        candidates: ctx
405            .vector_hits
406            .iter()
407            .enumerate()
408            .map(|(idx, hit)| {
409                let id = source_identity(&hit.collection, hit.entity_id);
410                refs.entry(id.clone())
411                    .or_insert(FusedSourceRef::VectorHit(idx));
412                Candidate {
413                    id,
414                    score: hit.score as f64,
415                }
416            })
417            .collect(),
418        min_score: None,
419    };
420    let graph_bucket = Bucket {
421        candidates: ctx
422            .graph_hits
423            .iter()
424            .enumerate()
425            .map(|(idx, hit)| {
426                let id = source_identity(&hit.collection, hit.entity_id);
427                refs.entry(id.clone())
428                    .or_insert(FusedSourceRef::GraphHit(idx));
429                Candidate {
430                    id,
431                    score: hit.score as f64,
432                }
433            })
434            .collect(),
435        min_score: None,
436    };
437
438    fuse(
439        &[row_bucket, text_bucket, vector_bucket, graph_bucket],
440        RRF_K_DEFAULT,
441        ctx.source_limit,
442    )
443    .into_iter()
444    .filter_map(|item| {
445        refs.get(&item.id).copied().map(|source| FusedSource {
446            source,
447            rrf_score: item.rrf_score,
448        })
449    })
450    .collect()
451}
452
453fn source_identity(collection: &str, entity_id: u64) -> String {
454    format!("{collection}/{entity_id}")
455}
456
457// ---------------------------------------------------------------------------
458// Stage 3 — BM25 text search scoped to the candidate collections.
459// ---------------------------------------------------------------------------
460
461/// Run the context index's sparse BM25 ranker over the candidate
462/// collections. This is the ASK text bucket used by RRF; literal row
463/// filtering remains as a separate high-precision bucket.
464pub fn text_search_bm25_scoped(
465    runtime: &RedDBRuntime,
466    scope: &EffectiveScope,
467    question: &str,
468    candidates: &CandidateCollections,
469    top_k: usize,
470) -> Vec<TextHit> {
471    if candidates.collections.is_empty() || top_k == 0 {
472        return Vec::new();
473    }
474
475    let visible = scope.visible_collections();
476    let allowed: BTreeSet<String> = candidates
477        .collections
478        .iter()
479        .filter(|collection| visible.is_none_or(|set| set.contains(*collection)))
480        .cloned()
481        .collect();
482    if allowed.is_empty() {
483        return Vec::new();
484    }
485
486    let _scope_guard = AskScopeGuard::install(scope);
487    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
488    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
489    let store = runtime.inner.db.store();
490
491    runtime
492        .inner
493        .db
494        .store()
495        .context_index()
496        .search_bm25(question, top_k, Some(&allowed))
497        .into_iter()
498        .filter_map(|hit| {
499            let entity = store.get(&hit.collection, hit.entity_id)?;
500            if !ask_entity_allowed(
501                runtime,
502                scope,
503                &hit.collection,
504                &entity,
505                snap_ctx.as_ref(),
506                &mut rls_cache,
507            ) {
508                return None;
509            }
510            Some(TextHit {
511                collection: hit.collection,
512                entity_id: hit.entity_id.raw(),
513                score: hit.score,
514            })
515        })
516        .collect()
517}
518
519// ---------------------------------------------------------------------------
520// Stage 1 — token / entity extraction (heuristic v1) + opt-in LLM NER routing.
521// ---------------------------------------------------------------------------
522
523/// Stage-1 dispatcher honouring the `ai.ner.backend` config knob.
524///
525/// The pipeline (and `execute_with_limit` in particular) stays
526/// **sync** per #123 deepening to avoid an async cascade through the
527/// rest of `ask_pipeline`. When the operator turns on
528/// `ai.ner.backend = "llm"`, we contain the async surface here by
529/// using `tokio::runtime::Handle::current().block_on(...)` — works
530/// when called from inside an async context (the production HTTP
531/// handler path), falls back to the heuristic with a warn-log when
532/// no Tokio runtime is reachable (sync test contexts and embedded
533/// callers without a runtime).
534///
535/// Auth gate: `LlmNer::extract` checks `ai:ner:read`. Today
536/// `EffectiveScope::has_capability` is a placeholder that always
537/// returns `false`, so a LLM-backend-configured deployment will see
538/// every call deny at the gate and the configured `HeuristicFallback`
539/// fires. A one-shot info log makes that visible in operator logs.
540/// Wiring the real capability check into the auth engine is future
541/// work — the routing seam is in place so that landing the auth
542/// extension is a one-line `EffectiveScope` change.
543fn extract_tokens_routed(
544    runtime: &RedDBRuntime,
545    scope: &EffectiveScope,
546    question: &str,
547) -> RedDBResult<TokenSet> {
548    let backend = runtime.config_string("ai.ner.backend", "heuristic");
549    if backend != "llm" {
550        return Ok(extract_tokens(question));
551    }
552
553    let endpoint = runtime.config_string("ai.ner.endpoint", "");
554    let model = runtime.config_string("ai.ner.model", "");
555    let timeout_ms = runtime
556        .config_string("ai.ner.timeout_ms", "5000")
557        .parse::<u32>()
558        .unwrap_or(5000);
559    let fallback = match runtime
560        .config_string("ai.ner.fallback", "use_heuristic")
561        .as_str()
562    {
563        "empty_on_fail" => HeuristicFallback::EmptyOnFail,
564        "propagate" => HeuristicFallback::Propagate,
565        _ => HeuristicFallback::UseHeuristic,
566    };
567
568    // Endpoint shape decides provider; default to OpenAI-compat when
569    // unspecified — matches the documented config shape.
570    let provider = if endpoint.is_empty() && model.is_empty() {
571        // No network config provided — operator opted into "llm" but
572        // didn't wire a backend. Fall back to a Stub::Empty so the
573        // configured fallback policy fires deterministically.
574        NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
575    } else {
576        NerProvider::OpenAiCompat { endpoint, model }
577    };
578
579    let mut ner = LlmNer::new(provider, fallback);
580    ner.timeout_ms = timeout_ms;
581
582    let auth = ScopeAuthAdapter(scope);
583    let llm_result = match tokio::runtime::Handle::try_current() {
584        Ok(handle) => {
585            // Use `block_on` from a thread that's not driving the
586            // current runtime — the typical caller is an Axum HTTP
587            // handler running on the multi-thread runtime, so we hop
588            // off via `block_in_place`.
589            tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
590        }
591        Err(_) => {
592            warn!(
593                target: "ask_pipeline",
594                "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
595            );
596            return Ok(extract_tokens(question));
597        }
598    };
599
600    match llm_result {
601        Ok(tokens) => Ok(tokens),
602        Err(NerError::AuthDenied) => {
603            log_auth_denial_once();
604            // Auth denials never honour fallback inside `LlmNer`, so
605            // the routing layer applies the configured fallback here
606            // — this is the bridge until the auth engine wires the
607            // capability for real.
608            apply_fallback(fallback, question)
609        }
610        Err(err) => {
611            warn!(
612                target: "ask_pipeline",
613                error = %err,
614                "LlmNer extract failed; honouring HeuristicFallback policy"
615            );
616            apply_fallback(fallback, question)
617        }
618    }
619}
620
621fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
622    match fallback {
623        HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
624        HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
625        HeuristicFallback::Propagate => Err(RedDBError::Query(
626            "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
627        )),
628    }
629}
630
631/// One-shot info log for the auth-gate placeholder. Until the auth
632/// engine learns to grant `ai:ner:read`, every routed call denies —
633/// we emit the explainer once per process so logs aren't spammed.
634fn log_auth_denial_once() {
635    static EMITTED: AtomicBool = AtomicBool::new(false);
636    if !EMITTED.swap(true, Ordering::Relaxed) {
637        info!(
638            target: "ask_pipeline",
639            capability = NER_CAPABILITY,
640            "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
641            NER_CAPABILITY
642        );
643    }
644}
645
646/// Adapter wrapping `EffectiveScope` so it can drive the
647/// `LlmNer`-side `AuthContext` trait without leaking that trait
648/// across the rest of the runtime.
649struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
650
651impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
654    }
655}
656
657impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
658    fn has_capability(&self, capability: &str) -> bool {
659        self.0.has_capability(capability)
660    }
661}
662
663/// Split a question into a [`TokenSet`]. Pure function — no runtime
664/// lookups.
665///
666/// Rules (heuristic v1):
667/// - **Literals** match `[A-Z0-9-]{3,}` AND contain ≥1 digit, OR are
668///   pure digit runs of length ≥ 6. Captures id shapes like
669///   `FDD-12313`, `INV-2024-001`, `123456`.
670/// - **Keywords** match `[A-Za-z][A-Za-z0-9_]+` (length ≥ 2),
671///   normalised to lowercase. Stop words are dropped to avoid
672///   wasting Stage-2 lookups on noise.
673pub fn extract_tokens(question: &str) -> TokenSet {
674    let mut keywords: Vec<String> = Vec::new();
675    let mut literals: Vec<String> = Vec::new();
676
677    let mut chars = question.chars().peekable();
678    let mut buf = String::new();
679
680    let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
681        if buf.is_empty() {
682            return;
683        }
684        let word = std::mem::take(buf);
685        classify_token(&word, keywords, literals);
686    };
687
688    while let Some(ch) = chars.next() {
689        if ch.is_alphanumeric() || ch == '_' || ch == '-' {
690            buf.push(ch);
691        } else {
692            flush(&mut buf, &mut keywords, &mut literals);
693            // Skip remaining whitespace / punctuation; loop continues.
694            let _ = ch;
695        }
696        // Look ahead to also flush on EOF.
697        if chars.peek().is_none() {
698            flush(&mut buf, &mut keywords, &mut literals);
699        }
700    }
701    // Final flush in case the loop body didn't see EOF (empty
702    // iterator path).
703    if !buf.is_empty() {
704        classify_token(&buf, &mut keywords, &mut literals);
705    }
706
707    // Dedup keywords + literals while preserving order.
708    let mut seen = HashSet::new();
709    keywords.retain(|tok| seen.insert(tok.clone()));
710    let mut seen_lit = HashSet::new();
711    literals.retain(|tok| seen_lit.insert(tok.clone()));
712
713    TokenSet { keywords, literals }
714}
715
716fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
717    // Literal: shape `[A-Z0-9-]{3,}` with at least one digit, OR
718    // pure-digit run of length ≥ 6.
719    let is_upper_id_shape = word.len() >= 3
720        && word
721            .chars()
722            .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
723        && word.chars().any(|c| c.is_ascii_digit())
724        && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
725    let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
726    if is_upper_id_shape || is_long_digit_run {
727        literals.push(word.to_string());
728        return;
729    }
730    // Keyword: starts with a letter, len ≥ 2, drop trailing/leading
731    // hyphens, lowercase.
732    let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
733    if trimmed.len() < 2 {
734        return;
735    }
736    if !trimmed
737        .chars()
738        .next()
739        .map(|c| c.is_ascii_alphabetic())
740        .unwrap_or(false)
741    {
742        return;
743    }
744    if !trimmed
745        .chars()
746        .all(|c| c.is_ascii_alphanumeric() || c == '_')
747    {
748        // Hyphenated word that wasn't a literal — skip rather than
749        // index a fragmented token.
750        return;
751    }
752    let lower = trimmed.to_ascii_lowercase();
753    if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
754        return;
755    }
756    keywords.push(lower);
757}
758
759/// Sorted ascii lowercase stop-word list. Kept tiny and curated so a
760/// regression in Stage 2 candidate-narrowing surfaces fast.
761const STOP_WORDS: &[&str] = &[
762    "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
763    "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
764    "where", "which", "with",
765];
766
767// ---------------------------------------------------------------------------
768// Stage 2 — schema-vocabulary match.
769// ---------------------------------------------------------------------------
770
771/// For each keyword, probe `SchemaVocabulary` and intersect the
772/// resulting collection set with `scope.visible_collections`. Returns
773/// the deduplicated, sorted candidate list plus a per-collection
774/// column hint set for Stage 4.
775pub fn match_schema(
776    runtime: &RedDBRuntime,
777    scope: &EffectiveScope,
778    tokens: &TokenSet,
779) -> RedDBResult<CandidateCollections> {
780    let visible = match scope.visible_collections() {
781        Some(set) => set.clone(),
782        None => {
783            // No scope wired = embedded mode. Fall back to "every
784            // collection in the DB" so the pipeline still runs;
785            // AuthorizedSearch is the seam that refuses the deny-
786            // default for AI commands.
787            runtime
788                .inner
789                .db
790                .store()
791                .list_collections()
792                .into_iter()
793                .collect()
794        }
795    };
796
797    let mut collections: BTreeSet<String> = BTreeSet::new();
798    let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
799    for keyword in &tokens.keywords {
800        let hits = runtime.schema_vocabulary_lookup(keyword);
801        for hit in hits {
802            if !visible.contains(&hit.collection) {
803                continue;
804            }
805            collections.insert(hit.collection.clone());
806            if let Some(column) = hit.column {
807                columns_by_collection
808                    .entry(hit.collection)
809                    .or_default()
810                    .insert(column);
811            }
812        }
813    }
814
815    Ok(CandidateCollections {
816        collections: collections.into_iter().collect(),
817        columns_by_collection: columns_by_collection
818            .into_iter()
819            .map(|(k, v)| (k, v.into_iter().collect()))
820            .collect(),
821    })
822}
823
824// ---------------------------------------------------------------------------
825// Stage 3 — vector search scoped to the candidate collections.
826// ---------------------------------------------------------------------------
827
828/// Embed the question (best-effort — skipped if no provider is
829/// configured, see `embed_question`) and run
830/// `AuthorizedSearch::execute_similar` over each candidate
831/// collection. Returns at most `top_k` hits sorted by similarity.
832pub fn vector_search_scoped(
833    runtime: &RedDBRuntime,
834    scope: &EffectiveScope,
835    question: &str,
836    candidates: &CandidateCollections,
837    top_k: usize,
838    min_score: Option<f32>,
839) -> Vec<VectorHit> {
840    if candidates.collections.is_empty() {
841        return Vec::new();
842    }
843    let Some(embedding) = embed_question(runtime, question) else {
844        return Vec::new();
845    };
846    let per_collection = top_k.max(1);
847    let mut hits: Vec<VectorHit> = Vec::new();
848    let _scope_guard = AskScopeGuard::install(scope);
849    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
850    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
851    let store = runtime.inner.db.store();
852    for collection in &candidates.collections {
853        match super::authorized_search::AuthorizedSearch::execute_similar(
854            runtime,
855            scope,
856            collection,
857            &embedding,
858            per_collection,
859            min_score.unwrap_or(0.0),
860        ) {
861            Ok(results) => {
862                for result in results {
863                    let Some(entity) = store.get(collection, result.entity_id) else {
864                        continue;
865                    };
866                    if !ask_entity_allowed(
867                        runtime,
868                        scope,
869                        collection,
870                        &entity,
871                        snap_ctx.as_ref(),
872                        &mut rls_cache,
873                    ) {
874                        continue;
875                    }
876                    hits.push(VectorHit {
877                        collection: collection.clone(),
878                        entity_id: result.entity_id.raw(),
879                        score: result.score,
880                    });
881                }
882            }
883            Err(err) => {
884                debug!(
885                    target: "ask_pipeline",
886                    collection = collection,
887                    err = %err,
888                    "vector_search_scoped: collection skipped"
889                );
890            }
891        }
892    }
893    hits.sort_by(|a, b| {
894        b.score
895            .partial_cmp(&a.score)
896            .unwrap_or(std::cmp::Ordering::Equal)
897            .then_with(|| a.entity_id.cmp(&b.entity_id))
898    });
899    hits.truncate(top_k);
900    hits
901}
902
903// ---------------------------------------------------------------------------
904// Stage 3b — graph traversal scoped to candidate collections.
905// ---------------------------------------------------------------------------
906
907/// Run context search with graph expansion enabled and keep only
908/// graph-traversal hits. `ASK ... DEPTH N` controls the traversal
909/// horizon; absent depth follows the existing ASK tool default.
910pub fn graph_search_scoped(
911    runtime: &RedDBRuntime,
912    scope: &EffectiveScope,
913    question: &str,
914    candidates: &CandidateCollections,
915    top_k: usize,
916    min_score: Option<f32>,
917    graph_depth: Option<usize>,
918) -> Vec<GraphHit> {
919    if candidates.collections.is_empty() || top_k == 0 {
920        return Vec::new();
921    }
922    let depth = graph_depth
923        .unwrap_or(crate::runtime::ai::mcp_ask_tool::DEPTH_DEFAULT as usize)
924        .max(1);
925    let _scope_guard = AskScopeGuard::install(scope);
926    let input = SearchContextInput {
927        query: question.to_string(),
928        field: None,
929        vector: None,
930        collections: Some(candidates.collections.clone()),
931        graph_depth: Some(depth),
932        graph_max_edges: None,
933        max_cross_refs: Some(0),
934        follow_cross_refs: Some(false),
935        expand_graph: Some(true),
936        global_scan: Some(true),
937        reindex: Some(false),
938        limit: Some(top_k),
939        min_score,
940    };
941    let result = match if scope.visible_collections().is_some() {
942        super::authorized_search::AuthorizedSearch::execute_context(runtime, scope, input)
943    } else {
944        runtime.search_context(input)
945    } {
946        Ok(result) => result,
947        Err(err) => {
948            debug!(
949                target: "ask_pipeline",
950                err = %err,
951                "graph_search_scoped: context search skipped"
952            );
953            return Vec::new();
954        }
955    };
956
957    let mut hits = Vec::new();
958    for entity in result
959        .graph
960        .nodes
961        .into_iter()
962        .chain(result.graph.edges.into_iter())
963    {
964        let crate::runtime::DiscoveryMethod::GraphTraversal { depth, .. } = entity.discovery else {
965            continue;
966        };
967        let kind = match entity.entity.kind {
968            EntityKind::GraphNode(_) => GraphHitKind::Node,
969            EntityKind::GraphEdge(_) => GraphHitKind::Edge,
970            _ => continue,
971        };
972        hits.push(GraphHit {
973            collection: entity.collection,
974            entity_id: entity.entity.id.raw(),
975            score: entity.score,
976            depth,
977            kind,
978        });
979    }
980    hits.sort_by(|a, b| {
981        b.score
982            .partial_cmp(&a.score)
983            .unwrap_or(std::cmp::Ordering::Equal)
984            .then_with(|| a.depth.cmp(&b.depth))
985            .then_with(|| a.collection.cmp(&b.collection))
986            .then_with(|| a.entity_id.cmp(&b.entity_id))
987    });
988    hits.truncate(top_k);
989    hits
990}
991
992/// Best-effort embedding. Returns `None` when no embedding provider
993/// is configured (the unit-test fixture path) — the caller treats
994/// `None` as "Stage 3 yielded zero hits" and continues.
995fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
996    let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
997        match runtime.inner.db.get_kv("red_config", key) {
998            Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
999            Some(_) => Ok(None),
1000            None => Ok(None),
1001        }
1002    };
1003    let provider = crate::ai::resolve_default_provider(&kv_getter);
1004    if !provider.is_openai_compatible() {
1005        return None;
1006    }
1007    let model = crate::ai::resolve_default_model(&provider, &kv_getter);
1008    let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
1009    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
1010    let request = crate::ai::OpenAiEmbeddingRequest {
1011        api_key,
1012        model,
1013        inputs: vec![question.to_string()],
1014        dimensions: None,
1015        api_base: provider.resolve_api_base(),
1016    };
1017    let response = crate::runtime::ai::block_on_ai(async move {
1018        crate::ai::openai_embeddings_async(&transport, request).await
1019    })
1020    .and_then(|result| result)
1021    .ok()?;
1022    response.embeddings.into_iter().next()
1023}
1024
1025// ---------------------------------------------------------------------------
1026// Stage 4 — value filter using literal tokens.
1027// ---------------------------------------------------------------------------
1028
1029/// Walk every candidate collection looking for rows whose columns
1030/// contain any of the literal tokens. Caller-supplied `row_cap`
1031/// bounds the result count; column hints from Stage 2 are visited
1032/// first so promising columns find a match before a full scan.
1033pub fn filter_values(
1034    runtime: &RedDBRuntime,
1035    scope: &EffectiveScope,
1036    candidates: &CandidateCollections,
1037    tokens: &TokenSet,
1038    row_cap: usize,
1039) -> Vec<FilteredRow> {
1040    if tokens.literals.is_empty() || candidates.collections.is_empty() {
1041        return Vec::new();
1042    }
1043    let visible = scope.visible_collections();
1044    let store = runtime.inner.db.store();
1045    let mut out: Vec<FilteredRow> = Vec::new();
1046    let _scope_guard = AskScopeGuard::install(scope);
1047    let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
1048    let mut rls_cache: HashMap<String, Option<crate::storage::query::ast::Filter>> = HashMap::new();
1049
1050    'collection: for collection in &candidates.collections {
1051        // Defence-in-depth: redo the visibility check here so a Stage
1052        // 2 regression can't smuggle an out-of-scope collection
1053        // through.
1054        if let Some(set) = visible {
1055            if !set.contains(collection) {
1056                continue;
1057            }
1058        }
1059        let Some(manager) = store.get_collection(collection) else {
1060            continue;
1061        };
1062        let hint_columns: &[String] = candidates
1063            .columns_by_collection
1064            .get(collection)
1065            .map(|v| v.as_slice())
1066            .unwrap_or(&[]);
1067
1068        for entity in manager.query_all(|_| true) {
1069            if !ask_entity_allowed(
1070                runtime,
1071                scope,
1072                collection,
1073                &entity,
1074                snap_ctx.as_ref(),
1075                &mut rls_cache,
1076            ) {
1077                continue;
1078            }
1079            if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
1080                out.push(FilteredRow {
1081                    collection: collection.clone(),
1082                    entity,
1083                    matched_literal: hit.0,
1084                    matched_column: hit.1,
1085                });
1086                if out.len() >= row_cap {
1087                    break 'collection;
1088                }
1089            }
1090        }
1091    }
1092    out
1093}
1094
1095fn ask_entity_allowed(
1096    runtime: &RedDBRuntime,
1097    scope: &EffectiveScope,
1098    collection: &str,
1099    entity: &UnifiedEntity,
1100    snap_ctx: Option<&crate::runtime::impl_core::SnapshotContext>,
1101    rls_cache: &mut HashMap<String, Option<crate::storage::query::ast::Filter>>,
1102) -> bool {
1103    if scope
1104        .visible_collections()
1105        .is_some_and(|visible| !visible.contains(collection))
1106    {
1107        return false;
1108    }
1109    runtime.search_entity_allowed(collection, entity, snap_ctx, rls_cache)
1110}
1111
1112struct AskScopeGuard {
1113    prev_tenant: Option<String>,
1114    prev_auth: Option<(String, crate::auth::Role)>,
1115}
1116
1117impl AskScopeGuard {
1118    fn install(scope: &EffectiveScope) -> Self {
1119        let prev_tenant = crate::runtime::impl_core::current_tenant();
1120        let prev_auth = crate::runtime::impl_core::current_auth_identity();
1121
1122        match scope.effective_scope() {
1123            Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant.to_string()),
1124            None => crate::runtime::impl_core::clear_current_tenant(),
1125        }
1126        match scope.identity() {
1127            Some((user, role)) => {
1128                crate::runtime::impl_core::set_current_auth_identity(user.to_string(), role)
1129            }
1130            None => crate::runtime::impl_core::clear_current_auth_identity(),
1131        }
1132
1133        Self {
1134            prev_tenant,
1135            prev_auth,
1136        }
1137    }
1138}
1139
1140impl Drop for AskScopeGuard {
1141    fn drop(&mut self) {
1142        match self.prev_tenant.take() {
1143            Some(tenant) => crate::runtime::impl_core::set_current_tenant(tenant),
1144            None => crate::runtime::impl_core::clear_current_tenant(),
1145        }
1146        match self.prev_auth.take() {
1147            Some((user, role)) => crate::runtime::impl_core::set_current_auth_identity(user, role),
1148            None => crate::runtime::impl_core::clear_current_auth_identity(),
1149        }
1150    }
1151}
1152
1153/// Look for any literal in any column value of `entity`. Hint
1154/// columns are checked first; a positive hit short-circuits.
1155fn literal_match_in_entity(
1156    entity: &UnifiedEntity,
1157    literals: &[String],
1158    hint_columns: &[String],
1159) -> Option<(String, Option<String>)> {
1160    let row = match &entity.data {
1161        EntityData::Row(row) => row,
1162        _ => return None,
1163    };
1164
1165    // Pass 1: hint columns first.
1166    for column in hint_columns {
1167        if let Some(value) = row.get_field(column) {
1168            if let Some(lit) = first_literal_in_value(value, literals) {
1169                return Some((lit, Some(column.clone())));
1170            }
1171        }
1172    }
1173    // Pass 2: every other column.
1174    for (name, value) in row.iter_fields() {
1175        if hint_columns.iter().any(|c| c == name) {
1176            continue;
1177        }
1178        if let Some(lit) = first_literal_in_value(value, literals) {
1179            return Some((lit, Some(name.to_string())));
1180        }
1181    }
1182    None
1183}
1184
1185fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
1186    let rendered = match value {
1187        Value::Text(s) => s.to_string(),
1188        Value::Integer(i) => i.to_string(),
1189        Value::Float(f) => f.to_string(),
1190        Value::Boolean(b) => b.to_string(),
1191        Value::Json(j) => String::from_utf8_lossy(j).to_string(),
1192        _ => return None,
1193    };
1194    for lit in literals {
1195        // Case-sensitive substring match: literals are id-shaped, so
1196        // we want `FDD-12313` to find embedded `FDD-12313` in a free-
1197        // form description column too.
1198        if rendered.contains(lit) {
1199            return Some(lit.clone());
1200        }
1201    }
1202    None
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207    use super::*;
1208
1209    // -- Stage 1 ------------------------------------------------------
1210
1211    #[test]
1212    fn extract_tokens_splits_keywords_and_literals() {
1213        let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
1214        // `quais`, `as`, `sobre`, `o` are stop words; `novidades`,
1215        // `passport` survive.
1216        assert!(tokens.keywords.contains(&"novidades".to_string()));
1217        assert!(tokens.keywords.contains(&"passport".to_string()));
1218        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1219        assert!(!tokens.is_empty());
1220    }
1221
1222    #[test]
1223    fn extract_tokens_returns_empty_for_punctuation_only() {
1224        let tokens = extract_tokens("???   ...");
1225        assert!(tokens.is_empty());
1226    }
1227
1228    #[test]
1229    fn extract_tokens_long_digit_run_is_a_literal() {
1230        let tokens = extract_tokens("show order 987654321 details");
1231        assert!(tokens.literals.contains(&"987654321".to_string()));
1232        assert!(tokens.keywords.contains(&"order".to_string()));
1233        assert!(tokens.keywords.contains(&"details".to_string()));
1234        assert!(tokens.keywords.contains(&"show".to_string()));
1235    }
1236
1237    #[test]
1238    fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
1239        // "USA" is uppercase but lacks a digit, so it stays a keyword
1240        // (lowercased) — Stage 2 still gets to probe it.
1241        let tokens = extract_tokens("USA exports report");
1242        assert!(tokens.keywords.contains(&"usa".to_string()));
1243        assert!(tokens.literals.is_empty());
1244    }
1245
1246    #[test]
1247    fn extract_tokens_dedups() {
1248        let tokens = extract_tokens("passport passport FDD-1 FDD-1");
1249        assert_eq!(
1250            tokens.keywords.iter().filter(|k| *k == "passport").count(),
1251            1
1252        );
1253        assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
1254    }
1255
1256    // -- Stage 4 helper ----------------------------------------------
1257
1258    #[test]
1259    fn first_literal_in_value_substring_match() {
1260        let lit = first_literal_in_value(
1261            &Value::text("issue FDD-12313 reported by user"),
1262            &["FDD-12313".to_string()],
1263        );
1264        assert_eq!(lit.as_deref(), Some("FDD-12313"));
1265    }
1266
1267    #[test]
1268    fn first_literal_in_value_no_match_returns_none() {
1269        assert!(
1270            first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
1271                .is_none()
1272        );
1273    }
1274
1275    // -- Pipeline-wide -----------------------------------------------
1276
1277    use crate::api::RedDBOptions;
1278    use crate::auth::Role;
1279    use crate::runtime::statement_frame::EffectiveScope;
1280    use crate::runtime::RedDBRuntime;
1281    use crate::storage::schema::Value;
1282    use crate::storage::transaction::snapshot::Snapshot;
1283    use crate::storage::unified::entity::{
1284        EntityData, EntityId, EntityKind, RowData, UnifiedEntity,
1285    };
1286    use std::sync::Arc;
1287
1288    fn make_scope(visible: HashSet<String>) -> EffectiveScope {
1289        EffectiveScope {
1290            tenant: Some("acme".to_string()),
1291            identity: Some(("alice".to_string(), Role::Read)),
1292            snapshot: Snapshot {
1293                xid: 0,
1294                in_progress: HashSet::new(),
1295            },
1296            visible_collections: Some(visible),
1297        }
1298    }
1299
1300    fn fresh_runtime() -> RedDBRuntime {
1301        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
1302    }
1303
1304    fn test_row(collection: &str, id: u64) -> FilteredRow {
1305        FilteredRow {
1306            collection: collection.to_string(),
1307            entity: UnifiedEntity::new(
1308                EntityId::new(id),
1309                EntityKind::TableRow {
1310                    table: Arc::from(collection),
1311                    row_id: id,
1312                },
1313                EntityData::Row(RowData {
1314                    columns: Vec::new(),
1315                    named: Some(
1316                        [("body".to_string(), Value::text("ticket FDD-1".to_string()))]
1317                            .into_iter()
1318                            .collect(),
1319                    ),
1320                    schema: None,
1321                }),
1322            ),
1323            matched_literal: "FDD-1".to_string(),
1324            matched_column: Some("body".to_string()),
1325        }
1326    }
1327
1328    fn test_graph_hit(collection: &str, id: u64, score: f32, depth: usize) -> GraphHit {
1329        GraphHit {
1330            collection: collection.to_string(),
1331            entity_id: id,
1332            score,
1333            depth,
1334            kind: GraphHitKind::Node,
1335        }
1336    }
1337
1338    fn test_text_hit(collection: &str, id: u64, score: f32) -> TextHit {
1339        TextHit {
1340            collection: collection.to_string(),
1341            entity_id: id,
1342            score,
1343        }
1344    }
1345
1346    struct TenantGuard;
1347
1348    impl TenantGuard {
1349        fn set(tenant: &str) -> Self {
1350            crate::runtime::impl_core::set_current_tenant(tenant.to_string());
1351            Self
1352        }
1353    }
1354
1355    impl Drop for TenantGuard {
1356        fn drop(&mut self) {
1357            crate::runtime::impl_core::clear_current_tenant();
1358        }
1359    }
1360
1361    fn row_text<'a>(entity: &'a UnifiedEntity, field: &str) -> Option<&'a str> {
1362        let row = entity.data.as_row()?;
1363        match row.get_field(field)? {
1364            Value::Text(value) => Some(value.as_ref()),
1365            _ => None,
1366        }
1367    }
1368
1369    #[test]
1370    fn fused_source_order_uses_rrf_and_total_limit() {
1371        let ctx = AskContext {
1372            source_limit: 2,
1373            filtered_rows: vec![test_row("incidents", 2), test_row("incidents", 1)],
1374            vector_hits: vec![
1375                VectorHit {
1376                    collection: "incidents".to_string(),
1377                    entity_id: 1,
1378                    score: 0.91,
1379                },
1380                VectorHit {
1381                    collection: "docs".to_string(),
1382                    entity_id: 9,
1383                    score: 0.88,
1384                },
1385            ],
1386            ..AskContext::default()
1387        };
1388
1389        let order = fused_source_order(&ctx);
1390
1391        assert_eq!(
1392            order,
1393            vec![
1394                FusedSourceRef::FilteredRow(1),
1395                FusedSourceRef::FilteredRow(0)
1396            ]
1397        );
1398    }
1399
1400    #[test]
1401    fn fused_source_order_includes_graph_bucket() {
1402        let ctx = AskContext {
1403            source_limit: 4,
1404            filtered_rows: vec![test_row("incidents", 1)],
1405            text_hits: vec![test_text_hit("articles", 5, 1.2)],
1406            vector_hits: vec![
1407                VectorHit {
1408                    collection: "incidents".to_string(),
1409                    entity_id: 1,
1410                    score: 0.91,
1411                },
1412                VectorHit {
1413                    collection: "docs".to_string(),
1414                    entity_id: 9,
1415                    score: 0.88,
1416                },
1417            ],
1418            graph_hits: vec![test_graph_hit("topology", 7, 0.80, 1)],
1419            ..AskContext::default()
1420        };
1421
1422        let order = fused_source_order(&ctx);
1423
1424        assert_eq!(
1425            order,
1426            vec![
1427                FusedSourceRef::FilteredRow(0),
1428                FusedSourceRef::TextHit(0),
1429                FusedSourceRef::GraphHit(0),
1430                FusedSourceRef::VectorHit(1),
1431            ]
1432        );
1433    }
1434
1435    #[test]
1436    fn text_search_bm25_scoped_ranks_specific_document_first() {
1437        let rt = fresh_runtime();
1438        rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1439            .expect("create docs");
1440        rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1441            .expect("insert specific doc");
1442        rt.execute_query(
1443            "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1444        )
1445        .expect("insert broad doc");
1446
1447        let scope = make_scope(["docs".to_string()].into_iter().collect());
1448        let candidates = CandidateCollections {
1449            collections: vec!["docs".to_string()],
1450            columns_by_collection: HashMap::new(),
1451        };
1452        let hits = text_search_bm25_scoped(&rt, &scope, "passport renewal", &candidates, 10);
1453
1454        assert_eq!(hits.len(), 2);
1455        assert!(
1456            hits[0].score > hits[1].score,
1457            "BM25 text bucket should prefer the shorter exact match: {hits:?}"
1458        );
1459    }
1460
1461    #[test]
1462    fn text_search_bm25_scoped_filters_rls_denied_hits() {
1463        let rt = fresh_runtime();
1464        rt.execute_query(
1465            "CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT) WITH CONTEXT INDEX ON (body)",
1466        )
1467        .expect("create docs");
1468        rt.execute_query(
1469            "INSERT INTO docs (id, tenant_id, body) VALUES \
1470             (1, 'acme', 'shared launch plan'), \
1471             (2, 'globex', 'shared launch plan')",
1472        )
1473        .expect("seed docs");
1474        rt.execute_query(
1475            "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1476        )
1477        .expect("create policy");
1478        rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1479            .expect("enable rls");
1480
1481        let _tenant = TenantGuard::set("acme");
1482        let scope = make_scope(["docs".to_string()].into_iter().collect());
1483        let candidates = CandidateCollections {
1484            collections: vec!["docs".to_string()],
1485            columns_by_collection: HashMap::new(),
1486        };
1487
1488        let hits = text_search_bm25_scoped(&rt, &scope, "shared launch", &candidates, 10);
1489
1490        assert_eq!(hits.len(), 1, "RLS should hide the globex hit: {hits:?}");
1491        let entity = rt
1492            .inner
1493            .db
1494            .store()
1495            .get(
1496                "docs",
1497                crate::storage::unified::entity::EntityId::new(hits[0].entity_id),
1498            )
1499            .expect("hit entity exists");
1500        assert_eq!(row_text(&entity, "tenant_id"), Some("acme"));
1501    }
1502
1503    #[test]
1504    fn execute_pipeline_retrieves_known_good_bm25_source_order() {
1505        let rt = fresh_runtime();
1506        rt.execute_query("CREATE TABLE docs (body TEXT) WITH CONTEXT INDEX ON (body)")
1507            .expect("create docs");
1508        rt.execute_query("INSERT INTO docs (body) VALUES ('passport renewal')")
1509            .expect("insert specific doc");
1510        rt.execute_query(
1511            "INSERT INTO docs (body) VALUES ('passport renewal travel hotel airline visa luggage itinerary')",
1512        )
1513        .expect("insert broad doc");
1514        rt.schema_vocabulary_apply(
1515            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1516                collection: "docs".to_string(),
1517                columns: vec!["body".into()],
1518                type_tags: Vec::new(),
1519                description: None,
1520            },
1521        );
1522
1523        let scope = make_scope(["docs".to_string()].into_iter().collect());
1524        let ctx = AskPipeline::execute_with_limit_and_min_score(
1525            &rt,
1526            &scope,
1527            "body passport renewal",
1528            2,
1529            None,
1530            Some(1),
1531        )
1532        .expect("pipeline executes");
1533
1534        assert_eq!(ctx.text_hits.len(), 2);
1535        assert!(
1536            ctx.text_hits[0].score > ctx.text_hits[1].score,
1537            "BM25 source order should prefer the shorter exact match: {:?}",
1538            ctx.text_hits
1539        );
1540        assert!(matches!(
1541            fused_source_order(&ctx).first(),
1542            Some(FusedSourceRef::TextHit(0))
1543        ));
1544    }
1545
1546    #[test]
1547    fn graph_search_scoped_honors_depth() {
1548        let rt = fresh_runtime();
1549        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('alice', 'Alice')")
1550            .expect("insert alice");
1551        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('bob', 'Bob')")
1552            .expect("insert bob");
1553        rt.execute_query("INSERT INTO tales NODE (label, name) VALUES ('carol', 'Carol')")
1554            .expect("insert carol");
1555        rt.execute_query(
1556            "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'alice', 'bob')",
1557        )
1558        .expect("insert alice-bob edge");
1559        rt.execute_query(
1560            "INSERT INTO tales EDGE (label, from, to) VALUES ('knows', 'bob', 'carol')",
1561        )
1562        .expect("insert bob-carol edge");
1563
1564        let scope = make_scope(["tales".to_string()].into_iter().collect());
1565        let candidates = CandidateCollections {
1566            collections: vec!["tales".to_string()],
1567            columns_by_collection: HashMap::new(),
1568        };
1569        let depth1 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(1));
1570        let depth2 = graph_search_scoped(&rt, &scope, "alice", &candidates, 10, None, Some(2));
1571
1572        assert!(
1573            depth1.iter().all(|hit| hit.depth <= 1),
1574            "DEPTH 1 returned hits beyond one hop: {depth1:?}"
1575        );
1576        assert!(
1577            depth2.iter().any(|hit| hit.depth == 2),
1578            "DEPTH 2 should include the second-hop graph hit: {depth2:?}"
1579        );
1580    }
1581
1582    #[test]
1583    fn filter_values_filters_rls_denied_rows() {
1584        let rt = fresh_runtime();
1585        rt.execute_query("CREATE TABLE docs (id INT, tenant_id TEXT, body TEXT)")
1586            .expect("create docs");
1587        rt.execute_query(
1588            "INSERT INTO docs (id, tenant_id, body) VALUES \
1589             (1, 'acme', 'incident FDD-12313'), \
1590             (2, 'globex', 'incident FDD-12313')",
1591        )
1592        .expect("seed docs");
1593        rt.execute_query(
1594            "CREATE POLICY tenant_only ON docs FOR SELECT USING (tenant_id = CURRENT_TENANT())",
1595        )
1596        .expect("create policy");
1597        rt.execute_query("ALTER TABLE docs ENABLE ROW LEVEL SECURITY")
1598            .expect("enable rls");
1599
1600        let _tenant = TenantGuard::set("acme");
1601        let scope = make_scope(["docs".to_string()].into_iter().collect());
1602        let candidates = CandidateCollections {
1603            collections: vec!["docs".to_string()],
1604            columns_by_collection: HashMap::from([("docs".to_string(), vec!["body".to_string()])]),
1605        };
1606        let tokens = TokenSet {
1607            keywords: vec!["incident".to_string()],
1608            literals: vec!["FDD-12313".to_string()],
1609        };
1610
1611        let rows = filter_values(&rt, &scope, &candidates, &tokens, 10);
1612
1613        assert_eq!(rows.len(), 1, "RLS should hide the globex row: {rows:?}");
1614        assert_eq!(row_text(&rows[0].entity, "tenant_id"), Some("acme"));
1615    }
1616
1617    /// Empty token sets short-circuit with a structured error before
1618    /// any LLM round-trip.
1619    #[test]
1620    fn execute_refuses_empty_token_set() {
1621        let rt = fresh_runtime();
1622        let scope = make_scope(HashSet::new());
1623        let err = AskPipeline::execute(&rt, &scope, "??? ...")
1624            .expect_err("empty token set must short-circuit");
1625        let msg = format!("{err}");
1626        assert!(
1627            msg.contains("yielded no usable tokens"),
1628            "expected structured empty-token error, got: {msg}"
1629        );
1630    }
1631
1632    /// match_schema drops every collection that's outside
1633    /// `scope.visible_collections`. Using a synthetic vocab via DDL
1634    /// events on the live runtime so the assertion drives real
1635    /// `RedDBRuntime::schema_vocabulary_lookup`.
1636    #[test]
1637    fn match_schema_intersects_with_visible_set() {
1638        let rt = fresh_runtime();
1639        // Two collections both carry a `passport` column. Caller's
1640        // scope only includes `travel`, so the `passport` column hit
1641        // on `secrets` must be dropped.
1642        rt.schema_vocabulary_apply(
1643            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1644                collection: "travel".to_string(),
1645                columns: vec!["id".into(), "passport".into()],
1646                type_tags: Vec::new(),
1647                description: None,
1648            },
1649        );
1650        rt.schema_vocabulary_apply(
1651            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
1652                collection: "secrets".to_string(),
1653                columns: vec!["passport".into()],
1654                type_tags: Vec::new(),
1655                description: None,
1656            },
1657        );
1658        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1659        let scope = make_scope(visible.clone());
1660        let tokens = TokenSet {
1661            keywords: vec!["passport".to_string()],
1662            literals: Vec::new(),
1663        };
1664        let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
1665        assert_eq!(candidates.collections, vec!["travel".to_string()]);
1666        assert!(!candidates.collections.contains(&"secrets".to_string()));
1667        // Column hint surfaces for the surviving collection.
1668        let cols = candidates
1669            .columns_by_collection
1670            .get("travel")
1671            .expect("hint columns");
1672        assert!(cols.contains(&"passport".to_string()));
1673    }
1674
1675    // -- Property test (issue #121 acceptance row) -------------------
1676    //
1677    // For 256 random (question, scope) pairs: every Stage 4 row's
1678    // collection MUST be inside `scope.visible_collections`. Drives
1679    // `filter_values` directly with synthetic candidate sets so the
1680    // invariant is pinned without an embedding API.
1681
1682    use proptest::prelude::*;
1683
1684    fn arb_collection() -> impl Strategy<Value = String> {
1685        "[a-z]{1,4}"
1686    }
1687
1688    fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
1689        prop::collection::hash_set(arb_collection(), 0..6)
1690    }
1691
1692    fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
1693        prop::collection::vec(arb_collection(), 0..8)
1694    }
1695
1696    proptest! {
1697        #![proptest_config(ProptestConfig::with_cases(256))]
1698        #[test]
1699        fn stage4_rows_subset_of_visible_collections(
1700            visible in arb_visible(),
1701            candidate_names in arb_candidates(),
1702            literal_count in 0usize..3,
1703        ) {
1704            // Single runtime shared across cases — `filter_values`
1705            // only reads (no mutation), and we need the empty-store
1706            // path so the invariant we want to pin is "no row escapes
1707            // visible_collections" rather than "any specific row
1708            // surfaces".
1709            let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
1710            let candidates = CandidateCollections {
1711                collections: candidate_names,
1712                columns_by_collection: HashMap::new(),
1713            };
1714            let literals: Vec<String> = (0..literal_count)
1715                .map(|i| format!("ID-{i}"))
1716                .collect();
1717            let tokens = TokenSet {
1718                keywords: vec!["passport".to_string()],
1719                literals,
1720            };
1721            let scope = make_scope(visible.clone());
1722            let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
1723            for row in &rows {
1724                prop_assert!(
1725                    visible.contains(&row.collection),
1726                    "Stage 4 leaked row collection={} not in visible={:?}",
1727                    row.collection, visible
1728                );
1729            }
1730        }
1731    }
1732
1733    static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
1734
1735    // -- Integration test (issue #121 acceptance row) ----------------
1736    //
1737    // Drives the four stages end-to-end through `AskPipeline::execute`
1738    // with the question the issue calls out:
1739    //   "quais as novidades sobre o passport FDD-12313?"
1740    //
1741    // Stage 1 must extract `passport` + `FDD-12313`. Stage 2 must
1742    // narrow to the `passports` collection (visible to the caller).
1743    // Stage 3 silently yields zero hits without an embedding provider
1744    // — that's expected for the test fixture path. Stage 4 must surface
1745    // the row whose `notes` column embeds `FDD-12313`.
1746
1747    #[test]
1748    fn integration_passport_fdd_12313_funnels_through_four_stages() {
1749        let rt = fresh_runtime();
1750        // The collection name itself + a `passport` column both feed
1751        // Stage 2's vocabulary; either one is enough for the question
1752        // "passport FDD-12313" to land here.
1753        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1754            .expect("CREATE TABLE travel");
1755        rt.execute_query(
1756            "INSERT INTO travel (id, passport, notes) VALUES \
1757             (1, 'BR-001', 'unrelated note'), \
1758             (2, 'PT-002', 'incident FDD-12313 escalated'), \
1759             (3, 'US-003', 'standard renewal')",
1760        )
1761        .expect("seed rows");
1762        // Out-of-scope collection — must NEVER surface in Stage 4.
1763        rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
1764            .expect("CREATE TABLE secrets");
1765        rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
1766            .expect("seed secrets");
1767
1768        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1769        let scope = make_scope(visible);
1770
1771        let ctx = AskPipeline::execute(
1772            &rt,
1773            &scope,
1774            "quais as novidades sobre o passport FDD-12313?",
1775        )
1776        .expect("pipeline runs");
1777
1778        // Stage 1: passport + FDD-12313 surfaced.
1779        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1780        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1781
1782        // Stage 2: candidates narrowed to `travel` (the `passport`
1783        // column on `secrets` is dropped by the visible-set
1784        // intersection).
1785        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1786
1787        // Stage 3: best-effort embedding — without a provider
1788        // configured, Stage 3 silently returns []; the rest of the
1789        // funnel still runs.
1790        let _ = &ctx.vector_hits;
1791
1792        // Stage 4: the row whose `notes` mentions `FDD-12313`
1793        // surfaces; the out-of-scope `secrets` row does NOT.
1794        assert!(
1795            ctx.filtered_rows
1796                .iter()
1797                .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1798            "expected travel row with FDD-12313 match, got: {:?}",
1799            ctx.filtered_rows
1800        );
1801        for row in &ctx.filtered_rows {
1802            assert_ne!(
1803                row.collection, "secrets",
1804                "secrets row leaked into Stage 4 output"
1805            );
1806        }
1807
1808        // Per-stage timing recorded.
1809        // (The Instant-based measurements may be 0 on very fast hosts;
1810        // we only check the field exists and was populated.)
1811        let _ = ctx.timings.extract_us
1812            + ctx.timings.schema_us
1813            + ctx.timings.vector_us
1814            + ctx.timings.filter_us;
1815    }
1816
1817    // -- Stage 1 routing (Lane 4/5: LlmNer wiring) -------------------
1818    //
1819    // The routing dispatcher is an `extract_tokens_routed` helper that
1820    // reads `ai.ner.backend` and either passes through to the heuristic
1821    // (default) or routes through `LlmNer::extract`. Today the
1822    // capability gate (`EffectiveScope::has_capability`) is a placeholder
1823    // that always returns `false`, so the LLM path always denies and
1824    // the configured `HeuristicFallback` policy fires. The tests below
1825    // pin every observable: heuristic stays the default, `llm + auth
1826    // denied` honours each fallback mode, and the one-shot info log is
1827    // best-effort (we don't assert it directly to avoid a coupling to
1828    // the global `tracing` subscriber).
1829
1830    fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1831        let store = rt.inner.db.store();
1832        store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1833    }
1834
1835    /// Default backend stays heuristic — even with an open scope, the
1836    /// pipeline returns the same tokens it would without any config.
1837    #[test]
1838    fn routed_default_backend_runs_heuristic() {
1839        let rt = fresh_runtime();
1840        let scope = make_scope(HashSet::new());
1841        let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1842            .expect("heuristic path is infallible");
1843        assert!(tokens.keywords.contains(&"passport".to_string()));
1844        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1845    }
1846
1847    /// `backend = llm` with `fallback = use_heuristic`: capability
1848    /// denies (placeholder) → fallback → heuristic tokens surface.
1849    #[tokio::test(flavor = "multi_thread")]
1850    async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1851        let rt = fresh_runtime();
1852        write_config(&rt, "ai.ner.backend", "llm");
1853        write_config(&rt, "ai.ner.fallback", "use_heuristic");
1854        let scope = make_scope(HashSet::new());
1855        let tokens = tokio::task::spawn_blocking(move || {
1856            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1857        })
1858        .await
1859        .unwrap()
1860        .expect("fallback policy keeps the call OK");
1861        assert!(tokens.keywords.contains(&"passport".to_string()));
1862        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1863    }
1864
1865    /// `backend = llm` with `fallback = empty_on_fail`: auth denies →
1866    /// fallback returns an empty `TokenSet`.
1867    #[tokio::test(flavor = "multi_thread")]
1868    async fn routed_llm_auth_denied_empty_on_fail() {
1869        let rt = fresh_runtime();
1870        write_config(&rt, "ai.ner.backend", "llm");
1871        write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1872        let scope = make_scope(HashSet::new());
1873        let tokens = tokio::task::spawn_blocking(move || {
1874            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1875        })
1876        .await
1877        .unwrap()
1878        .expect("empty_on_fail returns Ok with empty TokenSet");
1879        assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1880    }
1881
1882    /// `backend = llm` with `fallback = propagate`: auth denies →
1883    /// `extract_tokens_routed` surfaces a `RedDBError::Query` so the
1884    /// caller can decide.
1885    #[tokio::test(flavor = "multi_thread")]
1886    async fn routed_llm_auth_denied_propagate_returns_error() {
1887        let rt = fresh_runtime();
1888        write_config(&rt, "ai.ner.backend", "llm");
1889        write_config(&rt, "ai.ner.fallback", "propagate");
1890        let scope = make_scope(HashSet::new());
1891        let err = tokio::task::spawn_blocking(move || {
1892            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1893        })
1894        .await
1895        .unwrap()
1896        .expect_err("propagate must surface the error");
1897        let msg = format!("{err}");
1898        assert!(
1899            msg.contains("propagate") || msg.contains("ai.ner.backend"),
1900            "expected propagate error message, got: {msg}"
1901        );
1902    }
1903
1904    /// AskPipeline end-to-end with `backend = llm` and the default
1905    /// `use_heuristic` fallback: the pipeline still returns tokens (via
1906    /// fallback), Stage 1 is the only routed stage, and the rest of the
1907    /// funnel runs unchanged.
1908    #[tokio::test(flavor = "multi_thread")]
1909    async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1910        let rt = fresh_runtime();
1911        write_config(&rt, "ai.ner.backend", "llm");
1912        // default fallback is use_heuristic — leave it implicit.
1913        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1914            .expect("CREATE TABLE travel");
1915        rt.execute_query(
1916            "INSERT INTO travel (id, passport, notes) VALUES \
1917             (2, 'PT-002', 'incident FDD-12313 escalated')",
1918        )
1919        .expect("seed rows");
1920        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1921        let scope = make_scope(visible);
1922        let ctx = tokio::task::spawn_blocking(move || {
1923            AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1924        })
1925        .await
1926        .unwrap()
1927        .expect("pipeline runs");
1928        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1929        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1930        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1931        assert!(
1932            ctx.filtered_rows
1933                .iter()
1934                .any(|r| r.matched_literal == "FDD-12313"),
1935            "Stage 4 still runs after Stage 1 fallback"
1936        );
1937    }
1938}