Skip to main content

reddb_server/runtime/
ask_pipeline.rs

1//! AskPipeline — issue #121.
2//!
3//! 4-stage funnel that turns a natural-language ASK question into a
4//! scoped, filtered candidate set the LLM call can synthesise an
5//! answer over. Each stage is a pure function so the pipeline reads
6//! top-to-bottom in `execute` and individual stages can be unit-tested
7//! in isolation:
8//!
9//! 1. **`extract_tokens`** — heuristic NER. Splits the question into
10//!    `keywords` (alphanumeric words) and `literals` (uppercase ID-like
11//!    tokens, e.g. `FDD-12313`). LLM-NER is reserved for slice #123.
12//! 2. **`match_schema`** — `SchemaVocabulary::lookup` per keyword;
13//!    the resulting collection set is intersected with the caller's
14//!    `EffectiveScope.visible_collections` so out-of-scope tables
15//!    never enter the candidate pool. Issue #119 pre-filter.
16//! 3. **`vector_search_scoped`** — best-effort embedding + similarity
17//!    over the candidate collections via `AuthorizedSearch`. When no
18//!    embedding API is available (most unit-test fixtures), the stage
19//!    short-circuits with an empty match list — Stage 4 still runs.
20//! 4. **`filter_values`** — applies literal tokens as exact filters
21//!    over candidate-collection columns, returning the rows that
22//!    actually mention each literal.
23//!
24//! Output: typed [`AskContext`] holding all four stage outputs plus
25//! per-stage timing. Empty token-set short-circuits with a structured
26//! error so callers don't pay for an LLM round-trip on a query that
27//! contained nothing addressable.
28//!
29//! The legacy `RedDBRuntime::search_context` is now a Stage-2-internal
30//! helper for the broad-recall fallback; ASK no longer reaches for it
31//! directly.
32
33use std::collections::{BTreeSet, HashMap, HashSet};
34use std::sync::atomic::{AtomicBool, Ordering};
35use std::time::Instant;
36
37use tracing::{debug, info, info_span, warn};
38
39use super::ai::ner::{
40    AuthContext as NerAuthContext, HeuristicFallback, LlmNer, NerError, NerProvider, NER_CAPABILITY,
41};
42use super::statement_frame::{EffectiveScope, ReadFrame};
43use super::RedDBRuntime;
44use crate::api::{RedDBError, RedDBResult};
45use crate::storage::schema::Value;
46use crate::storage::unified::entity::{EntityData, UnifiedEntity};
47
48/// Default cap for Stage 4 row output. Override per-call via
49/// [`AskPipeline::execute_with_limit`].
50pub const DEFAULT_ROW_CAP: usize = 20;
51
52/// Token bag produced by Stage 1.
53#[derive(Debug, Clone, Default, PartialEq, Eq)]
54pub struct TokenSet {
55    /// Lowercase keyword tokens (regex `[A-Za-z][A-Za-z0-9_]+`,
56    /// length ≥ 2). Used by Stage 2 to probe `SchemaVocabulary`.
57    pub keywords: Vec<String>,
58    /// Literal-id tokens kept in their original case so Stage 4 can
59    /// run case-sensitive equality / substring filters. Matches
60    /// `[A-Z0-9-]{3,}` containing at least one digit OR `[0-9]{6,}`.
61    pub literals: Vec<String>,
62}
63
64impl TokenSet {
65    pub fn is_empty(&self) -> bool {
66        self.keywords.is_empty() && self.literals.is_empty()
67    }
68}
69
70/// Stage 2 output: collections likely to contain the answer.
71#[derive(Debug, Clone, Default)]
72pub struct CandidateCollections {
73    /// Collection names, sorted, deduplicated, intersected with the
74    /// caller's `EffectiveScope.visible_collections`.
75    pub collections: Vec<String>,
76    /// Columns hinted by `SchemaVocabulary` for each candidate
77    /// collection. Used by Stage 4 to scope the literal filter to
78    /// promising columns first.
79    pub columns_by_collection: HashMap<String, Vec<String>>,
80}
81
82/// One Stage 3 vector hit, kept thin so the pipeline doesn't pull
83/// the full `ScoredMatch` shape from `dsl::QueryResult` through.
84#[derive(Debug, Clone)]
85pub struct VectorHit {
86    pub collection: String,
87    pub entity_id: u64,
88    pub score: f32,
89}
90
91/// Stage 4 output: rows that match a literal filter.
92#[derive(Debug, Clone)]
93pub struct FilteredRow {
94    pub collection: String,
95    pub entity: UnifiedEntity,
96    /// Literal token that matched this row.
97    pub matched_literal: String,
98    /// Column where the match was found, when known.
99    pub matched_column: Option<String>,
100}
101
102/// Per-stage timing in microseconds. Logged via tracing on every run.
103#[derive(Debug, Clone, Default, PartialEq, Eq)]
104pub struct StageTimings {
105    pub extract_us: u64,
106    pub schema_us: u64,
107    pub vector_us: u64,
108    pub filter_us: u64,
109}
110
111/// Typed context handed back to the ASK caller. Carries all four
112/// stage outputs so the LLM-formatting helper (slice #122) can pick
113/// what it needs without re-running the funnel.
114#[derive(Debug, Clone, Default)]
115pub struct AskContext {
116    pub question: String,
117    pub tokens: TokenSet,
118    pub candidates: CandidateCollections,
119    pub vector_hits: Vec<VectorHit>,
120    pub filtered_rows: Vec<FilteredRow>,
121    pub timings: StageTimings,
122}
123
124/// Pipeline entry point. Instances are stateless — kept as an empty
125/// enum so callers spell `AskPipeline::execute(...)` (matches the
126/// shape #121 calls out).
127pub enum AskPipeline {}
128
129impl AskPipeline {
130    /// Run all four stages with the default row cap.
131    pub fn execute(
132        runtime: &RedDBRuntime,
133        scope: &EffectiveScope,
134        question: &str,
135    ) -> RedDBResult<AskContext> {
136        Self::execute_with_limit(runtime, scope, question, DEFAULT_ROW_CAP)
137    }
138
139    /// Run all four stages with a configurable Stage 4 row cap.
140    pub fn execute_with_limit(
141        runtime: &RedDBRuntime,
142        scope: &EffectiveScope,
143        question: &str,
144        row_cap: usize,
145    ) -> RedDBResult<AskContext> {
146        let span = info_span!(
147            "ask_pipeline.execute",
148            tenant = ?scope.effective_scope(),
149            question_len = question.len(),
150            row_cap = row_cap,
151        );
152        let _enter = span.enter();
153
154        // Stage 1. Routed through `extract_tokens_routed` so the
155        // `ai.ner.backend` config knob can swap the heuristic for the
156        // opt-in LLM NER without converting the surrounding pipeline
157        // to async (see `extract_tokens_routed` for the rationale).
158        let stage1 = Instant::now();
159        let tokens = extract_tokens_routed(runtime, scope, question)?;
160        let extract_us = stage1.elapsed().as_micros() as u64;
161        debug!(
162            target: "ask_pipeline",
163            stage = "extract_tokens",
164            keywords = ?tokens.keywords,
165            literals = ?tokens.literals,
166            elapsed_us = extract_us,
167            "stage 1 done"
168        );
169        if tokens.is_empty() {
170            warn!(
171                target: "ask_pipeline",
172                question_len = question.len(),
173                "refused: empty token set"
174            );
175            return Err(RedDBError::Query(
176                "ASK question yielded no usable tokens (heuristic NER produced empty keyword + literal set)"
177                    .to_string(),
178            ));
179        }
180
181        // Stage 2.
182        let stage2 = Instant::now();
183        let candidates = match_schema(runtime, scope, &tokens)?;
184        let schema_us = stage2.elapsed().as_micros() as u64;
185        debug!(
186            target: "ask_pipeline",
187            stage = "match_schema",
188            collections = ?candidates.collections,
189            elapsed_us = schema_us,
190            "stage 2 done"
191        );
192
193        // Stage 3.
194        let stage3 = Instant::now();
195        let vector_hits = vector_search_scoped(runtime, scope, question, &candidates, row_cap);
196        let vector_us = stage3.elapsed().as_micros() as u64;
197        debug!(
198            target: "ask_pipeline",
199            stage = "vector_search_scoped",
200            hits = vector_hits.len(),
201            elapsed_us = vector_us,
202            "stage 3 done"
203        );
204
205        // Stage 4.
206        let stage4 = Instant::now();
207        let filtered_rows = filter_values(runtime, scope, &candidates, &tokens, row_cap);
208        let filter_us = stage4.elapsed().as_micros() as u64;
209        debug!(
210            target: "ask_pipeline",
211            stage = "filter_values",
212            rows = filtered_rows.len(),
213            elapsed_us = filter_us,
214            "stage 4 done"
215        );
216
217        Ok(AskContext {
218            question: question.to_string(),
219            tokens,
220            candidates,
221            vector_hits,
222            filtered_rows,
223            timings: StageTimings {
224                extract_us,
225                schema_us,
226                vector_us,
227                filter_us,
228            },
229        })
230    }
231}
232
233// ---------------------------------------------------------------------------
234// Stage 1 — token / entity extraction (heuristic v1) + opt-in LLM NER routing.
235// ---------------------------------------------------------------------------
236
237/// Stage-1 dispatcher honouring the `ai.ner.backend` config knob.
238///
239/// The pipeline (and `execute_with_limit` in particular) stays
240/// **sync** per #123 deepening to avoid an async cascade through the
241/// rest of `ask_pipeline`. When the operator turns on
242/// `ai.ner.backend = "llm"`, we contain the async surface here by
243/// using `tokio::runtime::Handle::current().block_on(...)` — works
244/// when called from inside an async context (the production HTTP
245/// handler path), falls back to the heuristic with a warn-log when
246/// no Tokio runtime is reachable (sync test contexts and embedded
247/// callers without a runtime).
248///
249/// Auth gate: `LlmNer::extract` checks `ai:ner:read`. Today
250/// `EffectiveScope::has_capability` is a placeholder that always
251/// returns `false`, so a LLM-backend-configured deployment will see
252/// every call deny at the gate and the configured `HeuristicFallback`
253/// fires. A one-shot info log makes that visible in operator logs.
254/// Wiring the real capability check into the auth engine is future
255/// work — the routing seam is in place so that landing the auth
256/// extension is a one-line `EffectiveScope` change.
257fn extract_tokens_routed(
258    runtime: &RedDBRuntime,
259    scope: &EffectiveScope,
260    question: &str,
261) -> RedDBResult<TokenSet> {
262    let backend = runtime.config_string("ai.ner.backend", "heuristic");
263    if backend != "llm" {
264        return Ok(extract_tokens(question));
265    }
266
267    let endpoint = runtime.config_string("ai.ner.endpoint", "");
268    let model = runtime.config_string("ai.ner.model", "");
269    let timeout_ms = runtime
270        .config_string("ai.ner.timeout_ms", "5000")
271        .parse::<u32>()
272        .unwrap_or(5000);
273    let fallback = match runtime
274        .config_string("ai.ner.fallback", "use_heuristic")
275        .as_str()
276    {
277        "empty_on_fail" => HeuristicFallback::EmptyOnFail,
278        "propagate" => HeuristicFallback::Propagate,
279        _ => HeuristicFallback::UseHeuristic,
280    };
281
282    // Endpoint shape decides provider; default to OpenAI-compat when
283    // unspecified — matches the documented config shape.
284    let provider = if endpoint.is_empty() && model.is_empty() {
285        // No network config provided — operator opted into "llm" but
286        // didn't wire a backend. Fall back to a Stub::Empty so the
287        // configured fallback policy fires deterministically.
288        NerProvider::Stub(super::ai::ner::StubBehavior::Empty)
289    } else {
290        NerProvider::OpenAiCompat { endpoint, model }
291    };
292
293    let mut ner = LlmNer::new(provider, fallback);
294    ner.timeout_ms = timeout_ms;
295
296    let auth = ScopeAuthAdapter(scope);
297    let llm_result = match tokio::runtime::Handle::try_current() {
298        Ok(handle) => {
299            // Use `block_on` from a thread that's not driving the
300            // current runtime — the typical caller is an Axum HTTP
301            // handler running on the multi-thread runtime, so we hop
302            // off via `block_in_place`.
303            tokio::task::block_in_place(|| handle.block_on(ner.extract(question, scope, &auth)))
304        }
305        Err(_) => {
306            warn!(
307                target: "ask_pipeline",
308                "ai.ner.backend=llm configured but no Tokio runtime reachable from extract_tokens; using heuristic fallback"
309            );
310            return Ok(extract_tokens(question));
311        }
312    };
313
314    match llm_result {
315        Ok(tokens) => Ok(tokens),
316        Err(NerError::AuthDenied) => {
317            log_auth_denial_once();
318            // Auth denials never honour fallback inside `LlmNer`, so
319            // the routing layer applies the configured fallback here
320            // — this is the bridge until the auth engine wires the
321            // capability for real.
322            apply_fallback(fallback, question)
323        }
324        Err(err) => {
325            warn!(
326                target: "ask_pipeline",
327                error = %err,
328                "LlmNer extract failed; honouring HeuristicFallback policy"
329            );
330            apply_fallback(fallback, question)
331        }
332    }
333}
334
335fn apply_fallback(fallback: HeuristicFallback, question: &str) -> RedDBResult<TokenSet> {
336    match fallback {
337        HeuristicFallback::UseHeuristic => Ok(extract_tokens(question)),
338        HeuristicFallback::EmptyOnFail => Ok(TokenSet::default()),
339        HeuristicFallback::Propagate => Err(RedDBError::Query(
340            "ai.ner.backend=llm: extract failed and ai.ner.fallback=propagate".to_string(),
341        )),
342    }
343}
344
345/// One-shot info log for the auth-gate placeholder. Until the auth
346/// engine learns to grant `ai:ner:read`, every routed call denies —
347/// we emit the explainer once per process so logs aren't spammed.
348fn log_auth_denial_once() {
349    static EMITTED: AtomicBool = AtomicBool::new(false);
350    if !EMITTED.swap(true, Ordering::Relaxed) {
351        info!(
352            target: "ask_pipeline",
353            capability = NER_CAPABILITY,
354            "LlmNer routing configured but capability `{}` not yet wired into auth engine; falling back to heuristic",
355            NER_CAPABILITY
356        );
357    }
358}
359
360/// Adapter wrapping `EffectiveScope` so it can drive the
361/// `LlmNer`-side `AuthContext` trait without leaking that trait
362/// across the rest of the runtime.
363struct ScopeAuthAdapter<'a>(&'a EffectiveScope);
364
365impl<'a> std::fmt::Debug for ScopeAuthAdapter<'a> {
366    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
367        f.debug_struct("ScopeAuthAdapter").finish_non_exhaustive()
368    }
369}
370
371impl<'a> NerAuthContext for ScopeAuthAdapter<'a> {
372    fn has_capability(&self, capability: &str) -> bool {
373        self.0.has_capability(capability)
374    }
375}
376
377/// Split a question into a [`TokenSet`]. Pure function — no runtime
378/// lookups.
379///
380/// Rules (heuristic v1):
381/// - **Literals** match `[A-Z0-9-]{3,}` AND contain ≥1 digit, OR are
382///   pure digit runs of length ≥ 6. Captures id shapes like
383///   `FDD-12313`, `INV-2024-001`, `123456`.
384/// - **Keywords** match `[A-Za-z][A-Za-z0-9_]+` (length ≥ 2),
385///   normalised to lowercase. Stop words are dropped to avoid
386///   wasting Stage-2 lookups on noise.
387pub fn extract_tokens(question: &str) -> TokenSet {
388    let mut keywords: Vec<String> = Vec::new();
389    let mut literals: Vec<String> = Vec::new();
390
391    let mut chars = question.chars().peekable();
392    let mut buf = String::new();
393
394    let flush = |buf: &mut String, keywords: &mut Vec<String>, literals: &mut Vec<String>| {
395        if buf.is_empty() {
396            return;
397        }
398        let word = std::mem::take(buf);
399        classify_token(&word, keywords, literals);
400    };
401
402    while let Some(ch) = chars.next() {
403        if ch.is_alphanumeric() || ch == '_' || ch == '-' {
404            buf.push(ch);
405        } else {
406            flush(&mut buf, &mut keywords, &mut literals);
407            // Skip remaining whitespace / punctuation; loop continues.
408            let _ = ch;
409        }
410        // Look ahead to also flush on EOF.
411        if chars.peek().is_none() {
412            flush(&mut buf, &mut keywords, &mut literals);
413        }
414    }
415    // Final flush in case the loop body didn't see EOF (empty
416    // iterator path).
417    if !buf.is_empty() {
418        classify_token(&buf, &mut keywords, &mut literals);
419    }
420
421    // Dedup keywords + literals while preserving order.
422    let mut seen = HashSet::new();
423    keywords.retain(|tok| seen.insert(tok.clone()));
424    let mut seen_lit = HashSet::new();
425    literals.retain(|tok| seen_lit.insert(tok.clone()));
426
427    TokenSet { keywords, literals }
428}
429
430fn classify_token(word: &str, keywords: &mut Vec<String>, literals: &mut Vec<String>) {
431    // Literal: shape `[A-Z0-9-]{3,}` with at least one digit, OR
432    // pure-digit run of length ≥ 6.
433    let is_upper_id_shape = word.len() >= 3
434        && word
435            .chars()
436            .all(|c| c.is_ascii_digit() || c == '-' || c.is_ascii_uppercase())
437        && word.chars().any(|c| c.is_ascii_digit())
438        && word.chars().any(|c| c.is_ascii_uppercase() || c == '-');
439    let is_long_digit_run = word.len() >= 6 && word.chars().all(|c| c.is_ascii_digit());
440    if is_upper_id_shape || is_long_digit_run {
441        literals.push(word.to_string());
442        return;
443    }
444    // Keyword: starts with a letter, len ≥ 2, drop trailing/leading
445    // hyphens, lowercase.
446    let trimmed = word.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_');
447    if trimmed.len() < 2 {
448        return;
449    }
450    if !trimmed
451        .chars()
452        .next()
453        .map(|c| c.is_ascii_alphabetic())
454        .unwrap_or(false)
455    {
456        return;
457    }
458    if !trimmed
459        .chars()
460        .all(|c| c.is_ascii_alphanumeric() || c == '_')
461    {
462        // Hyphenated word that wasn't a literal — skip rather than
463        // index a fragmented token.
464        return;
465    }
466    let lower = trimmed.to_ascii_lowercase();
467    if STOP_WORDS.binary_search(&lower.as_str()).is_ok() {
468        return;
469    }
470    keywords.push(lower);
471}
472
473/// Sorted ascii lowercase stop-word list. Kept tiny and curated so a
474/// regression in Stage 2 candidate-narrowing surfaces fast.
475const STOP_WORDS: &[&str] = &[
476    "a", "about", "an", "and", "are", "as", "at", "be", "by", "do", "for", "from", "how", "in",
477    "is", "it", "of", "on", "or", "que", "qual", "quais", "sobre", "te", "the", "to", "what",
478    "where", "which", "with",
479];
480
481// ---------------------------------------------------------------------------
482// Stage 2 — schema-vocabulary match.
483// ---------------------------------------------------------------------------
484
485/// For each keyword, probe `SchemaVocabulary` and intersect the
486/// resulting collection set with `scope.visible_collections`. Returns
487/// the deduplicated, sorted candidate list plus a per-collection
488/// column hint set for Stage 4.
489pub fn match_schema(
490    runtime: &RedDBRuntime,
491    scope: &EffectiveScope,
492    tokens: &TokenSet,
493) -> RedDBResult<CandidateCollections> {
494    let visible = match scope.visible_collections() {
495        Some(set) => set.clone(),
496        None => {
497            // No scope wired = embedded mode. Fall back to "every
498            // collection in the DB" so the pipeline still runs;
499            // AuthorizedSearch is the seam that refuses the deny-
500            // default for AI commands.
501            runtime
502                .inner
503                .db
504                .store()
505                .list_collections()
506                .into_iter()
507                .collect()
508        }
509    };
510
511    let mut collections: BTreeSet<String> = BTreeSet::new();
512    let mut columns_by_collection: HashMap<String, BTreeSet<String>> = HashMap::new();
513    for keyword in &tokens.keywords {
514        let hits = runtime.schema_vocabulary_lookup(keyword);
515        for hit in hits {
516            if !visible.contains(&hit.collection) {
517                continue;
518            }
519            collections.insert(hit.collection.clone());
520            if let Some(column) = hit.column {
521                columns_by_collection
522                    .entry(hit.collection)
523                    .or_default()
524                    .insert(column);
525            }
526        }
527    }
528
529    Ok(CandidateCollections {
530        collections: collections.into_iter().collect(),
531        columns_by_collection: columns_by_collection
532            .into_iter()
533            .map(|(k, v)| (k, v.into_iter().collect()))
534            .collect(),
535    })
536}
537
538// ---------------------------------------------------------------------------
539// Stage 3 — vector search scoped to the candidate collections.
540// ---------------------------------------------------------------------------
541
542/// Embed the question (best-effort — skipped if no provider is
543/// configured, see `embed_question`) and run
544/// `AuthorizedSearch::execute_similar` over each candidate
545/// collection. Returns at most `top_k` hits sorted by similarity.
546pub fn vector_search_scoped(
547    runtime: &RedDBRuntime,
548    scope: &EffectiveScope,
549    question: &str,
550    candidates: &CandidateCollections,
551    top_k: usize,
552) -> Vec<VectorHit> {
553    if candidates.collections.is_empty() {
554        return Vec::new();
555    }
556    let Some(embedding) = embed_question(runtime, question) else {
557        return Vec::new();
558    };
559    let per_collection = top_k.max(1);
560    let mut hits: Vec<VectorHit> = Vec::new();
561    for collection in &candidates.collections {
562        match super::authorized_search::AuthorizedSearch::execute_similar(
563            runtime,
564            scope,
565            collection,
566            &embedding,
567            per_collection,
568            0.0,
569        ) {
570            Ok(results) => {
571                for result in results {
572                    hits.push(VectorHit {
573                        collection: collection.clone(),
574                        entity_id: result.entity_id.raw(),
575                        score: result.score,
576                    });
577                }
578            }
579            Err(err) => {
580                debug!(
581                    target: "ask_pipeline",
582                    collection = collection,
583                    err = %err,
584                    "vector_search_scoped: collection skipped"
585                );
586            }
587        }
588    }
589    hits.sort_by(|a, b| {
590        b.score
591            .partial_cmp(&a.score)
592            .unwrap_or(std::cmp::Ordering::Equal)
593            .then_with(|| a.entity_id.cmp(&b.entity_id))
594    });
595    hits.truncate(top_k);
596    hits
597}
598
599/// Best-effort embedding. Returns `None` when no embedding provider
600/// is configured (the unit-test fixture path) — the caller treats
601/// `None` as "Stage 3 yielded zero hits" and continues.
602fn embed_question(runtime: &RedDBRuntime, question: &str) -> Option<Vec<f32>> {
603    let kv_getter = |key: &str| -> RedDBResult<Option<String>> {
604        match runtime.inner.db.get_kv("red_config", key) {
605            Some((Value::Text(value), _)) => Ok(Some(value.to_string())),
606            Some(_) => Ok(None),
607            None => Ok(None),
608        }
609    };
610    let provider = crate::ai::resolve_default_provider(&kv_getter);
611    if !provider.is_openai_compatible() {
612        return None;
613    }
614    let model = crate::ai::resolve_default_model(&provider, &kv_getter);
615    let api_key = crate::ai::resolve_api_key(&provider, None, kv_getter).ok()?;
616    let transport = crate::runtime::ai::transport::AiTransport::from_runtime(runtime);
617    let request = crate::ai::OpenAiEmbeddingRequest {
618        api_key,
619        model,
620        inputs: vec![question.to_string()],
621        dimensions: None,
622        api_base: provider.resolve_api_base(),
623    };
624    let response = crate::runtime::ai::block_on_ai(async move {
625        crate::ai::openai_embeddings_async(&transport, request).await
626    })
627    .and_then(|result| result)
628    .ok()?;
629    response.embeddings.into_iter().next()
630}
631
632// ---------------------------------------------------------------------------
633// Stage 4 — value filter using literal tokens.
634// ---------------------------------------------------------------------------
635
636/// Walk every candidate collection looking for rows whose columns
637/// contain any of the literal tokens. Caller-supplied `row_cap`
638/// bounds the result count; column hints from Stage 2 are visited
639/// first so promising columns find a match before a full scan.
640pub fn filter_values(
641    runtime: &RedDBRuntime,
642    scope: &EffectiveScope,
643    candidates: &CandidateCollections,
644    tokens: &TokenSet,
645    row_cap: usize,
646) -> Vec<FilteredRow> {
647    if tokens.literals.is_empty() || candidates.collections.is_empty() {
648        return Vec::new();
649    }
650    let visible = scope.visible_collections();
651    let store = runtime.inner.db.store();
652    let mut out: Vec<FilteredRow> = Vec::new();
653
654    'collection: for collection in &candidates.collections {
655        // Defence-in-depth: redo the visibility check here so a Stage
656        // 2 regression can't smuggle an out-of-scope collection
657        // through.
658        if let Some(set) = visible {
659            if !set.contains(collection) {
660                continue;
661            }
662        }
663        let Some(manager) = store.get_collection(collection) else {
664            continue;
665        };
666        let hint_columns: &[String] = candidates
667            .columns_by_collection
668            .get(collection)
669            .map(|v| v.as_slice())
670            .unwrap_or(&[]);
671
672        for entity in manager.query_all(|_| true) {
673            if let Some(hit) = literal_match_in_entity(&entity, &tokens.literals, hint_columns) {
674                out.push(FilteredRow {
675                    collection: collection.clone(),
676                    entity,
677                    matched_literal: hit.0,
678                    matched_column: hit.1,
679                });
680                if out.len() >= row_cap {
681                    break 'collection;
682                }
683            }
684        }
685    }
686    out
687}
688
689/// Look for any literal in any column value of `entity`. Hint
690/// columns are checked first; a positive hit short-circuits.
691fn literal_match_in_entity(
692    entity: &UnifiedEntity,
693    literals: &[String],
694    hint_columns: &[String],
695) -> Option<(String, Option<String>)> {
696    let row = match &entity.data {
697        EntityData::Row(row) => row,
698        _ => return None,
699    };
700
701    // Pass 1: hint columns first.
702    for column in hint_columns {
703        if let Some(value) = row.get_field(column) {
704            if let Some(lit) = first_literal_in_value(value, literals) {
705                return Some((lit, Some(column.clone())));
706            }
707        }
708    }
709    // Pass 2: every other column.
710    for (name, value) in row.iter_fields() {
711        if hint_columns.iter().any(|c| c == name) {
712            continue;
713        }
714        if let Some(lit) = first_literal_in_value(value, literals) {
715            return Some((lit, Some(name.to_string())));
716        }
717    }
718    None
719}
720
721fn first_literal_in_value(value: &Value, literals: &[String]) -> Option<String> {
722    let rendered = match value {
723        Value::Text(s) => s.to_string(),
724        Value::Integer(i) => i.to_string(),
725        Value::Float(f) => f.to_string(),
726        Value::Boolean(b) => b.to_string(),
727        Value::Json(j) => String::from_utf8_lossy(j).to_string(),
728        _ => return None,
729    };
730    for lit in literals {
731        // Case-sensitive substring match: literals are id-shaped, so
732        // we want `FDD-12313` to find embedded `FDD-12313` in a free-
733        // form description column too.
734        if rendered.contains(lit) {
735            return Some(lit.clone());
736        }
737    }
738    None
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744
745    // -- Stage 1 ------------------------------------------------------
746
747    #[test]
748    fn extract_tokens_splits_keywords_and_literals() {
749        let tokens = extract_tokens("quais as novidades sobre o passport FDD-12313?");
750        // `quais`, `as`, `sobre`, `o` are stop words; `novidades`,
751        // `passport` survive.
752        assert!(tokens.keywords.contains(&"novidades".to_string()));
753        assert!(tokens.keywords.contains(&"passport".to_string()));
754        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
755        assert!(!tokens.is_empty());
756    }
757
758    #[test]
759    fn extract_tokens_returns_empty_for_punctuation_only() {
760        let tokens = extract_tokens("???   ...");
761        assert!(tokens.is_empty());
762    }
763
764    #[test]
765    fn extract_tokens_long_digit_run_is_a_literal() {
766        let tokens = extract_tokens("show order 987654321 details");
767        assert!(tokens.literals.contains(&"987654321".to_string()));
768        assert!(tokens.keywords.contains(&"order".to_string()));
769        assert!(tokens.keywords.contains(&"details".to_string()));
770        assert!(tokens.keywords.contains(&"show".to_string()));
771    }
772
773    #[test]
774    fn extract_tokens_short_uppercase_word_is_keyword_not_literal() {
775        // "USA" is uppercase but lacks a digit, so it stays a keyword
776        // (lowercased) — Stage 2 still gets to probe it.
777        let tokens = extract_tokens("USA exports report");
778        assert!(tokens.keywords.contains(&"usa".to_string()));
779        assert!(tokens.literals.is_empty());
780    }
781
782    #[test]
783    fn extract_tokens_dedups() {
784        let tokens = extract_tokens("passport passport FDD-1 FDD-1");
785        assert_eq!(
786            tokens.keywords.iter().filter(|k| *k == "passport").count(),
787            1
788        );
789        assert_eq!(tokens.literals.iter().filter(|l| *l == "FDD-1").count(), 1);
790    }
791
792    // -- Stage 4 helper ----------------------------------------------
793
794    #[test]
795    fn first_literal_in_value_substring_match() {
796        let lit = first_literal_in_value(
797            &Value::text("issue FDD-12313 reported by user"),
798            &["FDD-12313".to_string()],
799        );
800        assert_eq!(lit.as_deref(), Some("FDD-12313"));
801    }
802
803    #[test]
804    fn first_literal_in_value_no_match_returns_none() {
805        assert!(
806            first_literal_in_value(&Value::text("nothing here"), &["FDD-12313".to_string()],)
807                .is_none()
808        );
809    }
810
811    // -- Pipeline-wide -----------------------------------------------
812
813    use crate::api::RedDBOptions;
814    use crate::auth::Role;
815    use crate::runtime::statement_frame::EffectiveScope;
816    use crate::runtime::RedDBRuntime;
817    use crate::storage::transaction::snapshot::Snapshot;
818
819    fn make_scope(visible: HashSet<String>) -> EffectiveScope {
820        EffectiveScope {
821            tenant: Some("acme".to_string()),
822            identity: Some(("alice".to_string(), Role::Read)),
823            snapshot: Snapshot {
824                xid: 0,
825                in_progress: HashSet::new(),
826            },
827            visible_collections: Some(visible),
828        }
829    }
830
831    fn fresh_runtime() -> RedDBRuntime {
832        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
833    }
834
835    /// Empty token sets short-circuit with a structured error before
836    /// any LLM round-trip.
837    #[test]
838    fn execute_refuses_empty_token_set() {
839        let rt = fresh_runtime();
840        let scope = make_scope(HashSet::new());
841        let err = AskPipeline::execute(&rt, &scope, "??? ...")
842            .expect_err("empty token set must short-circuit");
843        let msg = format!("{err}");
844        assert!(
845            msg.contains("yielded no usable tokens"),
846            "expected structured empty-token error, got: {msg}"
847        );
848    }
849
850    /// match_schema drops every collection that's outside
851    /// `scope.visible_collections`. Using a synthetic vocab via DDL
852    /// events on the live runtime so the assertion drives real
853    /// `RedDBRuntime::schema_vocabulary_lookup`.
854    #[test]
855    fn match_schema_intersects_with_visible_set() {
856        let rt = fresh_runtime();
857        // Two collections both carry a `passport` column. Caller's
858        // scope only includes `travel`, so the `passport` column hit
859        // on `secrets` must be dropped.
860        rt.schema_vocabulary_apply(
861            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
862                collection: "travel".to_string(),
863                columns: vec!["id".into(), "passport".into()],
864                type_tags: Vec::new(),
865                description: None,
866            },
867        );
868        rt.schema_vocabulary_apply(
869            crate::runtime::schema_vocabulary::DdlEvent::CreateCollection {
870                collection: "secrets".to_string(),
871                columns: vec!["passport".into()],
872                type_tags: Vec::new(),
873                description: None,
874            },
875        );
876        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
877        let scope = make_scope(visible.clone());
878        let tokens = TokenSet {
879            keywords: vec!["passport".to_string()],
880            literals: Vec::new(),
881        };
882        let candidates = match_schema(&rt, &scope, &tokens).expect("ok");
883        assert_eq!(candidates.collections, vec!["travel".to_string()]);
884        assert!(!candidates.collections.contains(&"secrets".to_string()));
885        // Column hint surfaces for the surviving collection.
886        let cols = candidates
887            .columns_by_collection
888            .get("travel")
889            .expect("hint columns");
890        assert!(cols.contains(&"passport".to_string()));
891    }
892
893    // -- Property test (issue #121 acceptance row) -------------------
894    //
895    // For 256 random (question, scope) pairs: every Stage 4 row's
896    // collection MUST be inside `scope.visible_collections`. Drives
897    // `filter_values` directly with synthetic candidate sets so the
898    // invariant is pinned without an embedding API.
899
900    use proptest::prelude::*;
901
902    fn arb_collection() -> impl Strategy<Value = String> {
903        "[a-z]{1,4}"
904    }
905
906    fn arb_visible() -> impl Strategy<Value = HashSet<String>> {
907        prop::collection::hash_set(arb_collection(), 0..6)
908    }
909
910    fn arb_candidates() -> impl Strategy<Value = Vec<String>> {
911        prop::collection::vec(arb_collection(), 0..8)
912    }
913
914    proptest! {
915        #![proptest_config(ProptestConfig::with_cases(256))]
916        #[test]
917        fn stage4_rows_subset_of_visible_collections(
918            visible in arb_visible(),
919            candidate_names in arb_candidates(),
920            literal_count in 0usize..3,
921        ) {
922            // Single runtime shared across cases — `filter_values`
923            // only reads (no mutation), and we need the empty-store
924            // path so the invariant we want to pin is "no row escapes
925            // visible_collections" rather than "any specific row
926            // surfaces".
927            let rt = PROPTEST_RUNTIME.get_or_init(fresh_runtime);
928            let candidates = CandidateCollections {
929                collections: candidate_names,
930                columns_by_collection: HashMap::new(),
931            };
932            let literals: Vec<String> = (0..literal_count)
933                .map(|i| format!("ID-{i}"))
934                .collect();
935            let tokens = TokenSet {
936                keywords: vec!["passport".to_string()],
937                literals,
938            };
939            let scope = make_scope(visible.clone());
940            let rows = filter_values(rt, &scope, &candidates, &tokens, DEFAULT_ROW_CAP);
941            for row in &rows {
942                prop_assert!(
943                    visible.contains(&row.collection),
944                    "Stage 4 leaked row collection={} not in visible={:?}",
945                    row.collection, visible
946                );
947            }
948        }
949    }
950
951    static PROPTEST_RUNTIME: std::sync::OnceLock<RedDBRuntime> = std::sync::OnceLock::new();
952
953    // -- Integration test (issue #121 acceptance row) ----------------
954    //
955    // Drives the four stages end-to-end through `AskPipeline::execute`
956    // with the question the issue calls out:
957    //   "quais as novidades sobre o passport FDD-12313?"
958    //
959    // Stage 1 must extract `passport` + `FDD-12313`. Stage 2 must
960    // narrow to the `passports` collection (visible to the caller).
961    // Stage 3 silently yields zero hits without an embedding provider
962    // — that's expected for the test fixture path. Stage 4 must surface
963    // the row whose `notes` column embeds `FDD-12313`.
964
965    #[test]
966    fn integration_passport_fdd_12313_funnels_through_four_stages() {
967        let rt = fresh_runtime();
968        // The collection name itself + a `passport` column both feed
969        // Stage 2's vocabulary; either one is enough for the question
970        // "passport FDD-12313" to land here.
971        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
972            .expect("CREATE TABLE travel");
973        rt.execute_query(
974            "INSERT INTO travel (id, passport, notes) VALUES \
975             (1, 'BR-001', 'unrelated note'), \
976             (2, 'PT-002', 'incident FDD-12313 escalated'), \
977             (3, 'US-003', 'standard renewal')",
978        )
979        .expect("seed rows");
980        // Out-of-scope collection — must NEVER surface in Stage 4.
981        rt.execute_query("CREATE TABLE secrets (id INT, passport TEXT)")
982            .expect("CREATE TABLE secrets");
983        rt.execute_query("INSERT INTO secrets (id, passport) VALUES (99, 'FDD-12313')")
984            .expect("seed secrets");
985
986        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
987        let scope = make_scope(visible);
988
989        let ctx = AskPipeline::execute(
990            &rt,
991            &scope,
992            "quais as novidades sobre o passport FDD-12313?",
993        )
994        .expect("pipeline runs");
995
996        // Stage 1: passport + FDD-12313 surfaced.
997        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
998        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
999
1000        // Stage 2: candidates narrowed to `travel` (the `passport`
1001        // column on `secrets` is dropped by the visible-set
1002        // intersection).
1003        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1004
1005        // Stage 3: best-effort embedding — without a provider
1006        // configured, Stage 3 silently returns []; the rest of the
1007        // funnel still runs.
1008        let _ = &ctx.vector_hits;
1009
1010        // Stage 4: the row whose `notes` mentions `FDD-12313`
1011        // surfaces; the out-of-scope `secrets` row does NOT.
1012        assert!(
1013            ctx.filtered_rows
1014                .iter()
1015                .any(|r| r.collection == "travel" && r.matched_literal == "FDD-12313"),
1016            "expected travel row with FDD-12313 match, got: {:?}",
1017            ctx.filtered_rows
1018        );
1019        for row in &ctx.filtered_rows {
1020            assert_ne!(
1021                row.collection, "secrets",
1022                "secrets row leaked into Stage 4 output"
1023            );
1024        }
1025
1026        // Per-stage timing recorded.
1027        // (The Instant-based measurements may be 0 on very fast hosts;
1028        // we only check the field exists and was populated.)
1029        let _ = ctx.timings.extract_us
1030            + ctx.timings.schema_us
1031            + ctx.timings.vector_us
1032            + ctx.timings.filter_us;
1033    }
1034
1035    // -- Stage 1 routing (Lane 4/5: LlmNer wiring) -------------------
1036    //
1037    // The routing dispatcher is an `extract_tokens_routed` helper that
1038    // reads `ai.ner.backend` and either passes through to the heuristic
1039    // (default) or routes through `LlmNer::extract`. Today the
1040    // capability gate (`EffectiveScope::has_capability`) is a placeholder
1041    // that always returns `false`, so the LLM path always denies and
1042    // the configured `HeuristicFallback` policy fires. The tests below
1043    // pin every observable: heuristic stays the default, `llm + auth
1044    // denied` honours each fallback mode, and the one-shot info log is
1045    // best-effort (we don't assert it directly to avoid a coupling to
1046    // the global `tracing` subscriber).
1047
1048    fn write_config(rt: &RedDBRuntime, key: &str, value: &str) {
1049        let store = rt.inner.db.store();
1050        store.set_config_tree(key, &crate::serde_json::Value::String(value.to_string()));
1051    }
1052
1053    /// Default backend stays heuristic — even with an open scope, the
1054    /// pipeline returns the same tokens it would without any config.
1055    #[test]
1056    fn routed_default_backend_runs_heuristic() {
1057        let rt = fresh_runtime();
1058        let scope = make_scope(HashSet::new());
1059        let tokens = extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1060            .expect("heuristic path is infallible");
1061        assert!(tokens.keywords.contains(&"passport".to_string()));
1062        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1063    }
1064
1065    /// `backend = llm` with `fallback = use_heuristic`: capability
1066    /// denies (placeholder) → fallback → heuristic tokens surface.
1067    #[tokio::test(flavor = "multi_thread")]
1068    async fn routed_llm_auth_denied_uses_heuristic_fallback() {
1069        let rt = fresh_runtime();
1070        write_config(&rt, "ai.ner.backend", "llm");
1071        write_config(&rt, "ai.ner.fallback", "use_heuristic");
1072        let scope = make_scope(HashSet::new());
1073        let tokens = tokio::task::spawn_blocking(move || {
1074            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1075        })
1076        .await
1077        .unwrap()
1078        .expect("fallback policy keeps the call OK");
1079        assert!(tokens.keywords.contains(&"passport".to_string()));
1080        assert!(tokens.literals.contains(&"FDD-12313".to_string()));
1081    }
1082
1083    /// `backend = llm` with `fallback = empty_on_fail`: auth denies →
1084    /// fallback returns an empty `TokenSet`.
1085    #[tokio::test(flavor = "multi_thread")]
1086    async fn routed_llm_auth_denied_empty_on_fail() {
1087        let rt = fresh_runtime();
1088        write_config(&rt, "ai.ner.backend", "llm");
1089        write_config(&rt, "ai.ner.fallback", "empty_on_fail");
1090        let scope = make_scope(HashSet::new());
1091        let tokens = tokio::task::spawn_blocking(move || {
1092            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1093        })
1094        .await
1095        .unwrap()
1096        .expect("empty_on_fail returns Ok with empty TokenSet");
1097        assert!(tokens.is_empty(), "expected empty TokenSet, got {tokens:?}");
1098    }
1099
1100    /// `backend = llm` with `fallback = propagate`: auth denies →
1101    /// `extract_tokens_routed` surfaces a `RedDBError::Query` so the
1102    /// caller can decide.
1103    #[tokio::test(flavor = "multi_thread")]
1104    async fn routed_llm_auth_denied_propagate_returns_error() {
1105        let rt = fresh_runtime();
1106        write_config(&rt, "ai.ner.backend", "llm");
1107        write_config(&rt, "ai.ner.fallback", "propagate");
1108        let scope = make_scope(HashSet::new());
1109        let err = tokio::task::spawn_blocking(move || {
1110            extract_tokens_routed(&rt, &scope, "passport FDD-12313")
1111        })
1112        .await
1113        .unwrap()
1114        .expect_err("propagate must surface the error");
1115        let msg = format!("{err}");
1116        assert!(
1117            msg.contains("propagate") || msg.contains("ai.ner.backend"),
1118            "expected propagate error message, got: {msg}"
1119        );
1120    }
1121
1122    /// AskPipeline end-to-end with `backend = llm` and the default
1123    /// `use_heuristic` fallback: the pipeline still returns tokens (via
1124    /// fallback), Stage 1 is the only routed stage, and the rest of the
1125    /// funnel runs unchanged.
1126    #[tokio::test(flavor = "multi_thread")]
1127    async fn execute_with_llm_backend_falls_back_and_completes_pipeline() {
1128        let rt = fresh_runtime();
1129        write_config(&rt, "ai.ner.backend", "llm");
1130        // default fallback is use_heuristic — leave it implicit.
1131        rt.execute_query("CREATE TABLE travel (id INT, passport TEXT, notes TEXT)")
1132            .expect("CREATE TABLE travel");
1133        rt.execute_query(
1134            "INSERT INTO travel (id, passport, notes) VALUES \
1135             (2, 'PT-002', 'incident FDD-12313 escalated')",
1136        )
1137        .expect("seed rows");
1138        let visible: HashSet<String> = ["travel".to_string()].into_iter().collect();
1139        let scope = make_scope(visible);
1140        let ctx = tokio::task::spawn_blocking(move || {
1141            AskPipeline::execute(&rt, &scope, "passport FDD-12313")
1142        })
1143        .await
1144        .unwrap()
1145        .expect("pipeline runs");
1146        assert!(ctx.tokens.keywords.contains(&"passport".to_string()));
1147        assert!(ctx.tokens.literals.contains(&"FDD-12313".to_string()));
1148        assert_eq!(ctx.candidates.collections, vec!["travel".to_string()]);
1149        assert!(
1150            ctx.filtered_rows
1151                .iter()
1152                .any(|r| r.matched_literal == "FDD-12313"),
1153            "Stage 4 still runs after Stage 1 fallback"
1154        );
1155    }
1156}