Skip to main content

reddb_server/runtime/
statement_frame.rs

1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7    collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8    has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9    query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10    SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30    /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31    /// any role from `Role::Read` upward.
32    Read,
33    /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34    /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35    /// at least `Role::Write`.
36    Write,
37    /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38    /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39    Admin,
40    /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41    /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42    /// for any authenticated identity.
43    None,
44}
45
46impl Privilege {
47    /// `true` iff `role` is sufficient to execute a statement carrying
48    /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49    /// Admin` containment used by the auth fallback path.
50    pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51        match self {
52            Self::None => true,
53            Self::Read => role.can_read(),
54            Self::Write => role.can_write(),
55            Self::Admin => role.can_admin(),
56        }
57    }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67    /// No collection-level lock needed (transaction control, SET,
68    /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69    None,
70    /// Reader-style intent: SELECT, joins, graph / queue / search
71    /// reads. Maps to `(IS, IS)` at the storage layer.
72    Shared,
73    /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74    /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75    /// `Exclusive` at this granularity — call sites that need the
76    /// finer distinction still consult `intent_lock_modes_for`.
77    Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93    /// Effective tenant scope for the statement after WITHIN /
94    /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95    /// "no tenant bound" (RLS deny-default applies).
96    fn effective_scope(&self) -> Option<&str>;
97
98    /// Authenticated identity observed at frame-build time, if any.
99    /// Returns `(username, role)` so callers can render audit lines
100    /// or feed RLS policy lookups without re-reading thread-locals.
101    fn identity(&self) -> Option<(&str, Role)>;
102
103    /// MVCC snapshot the statement reads against. For autocommit
104    /// this is a fresh snapshot; inside an active transaction it
105    /// is the txn's snapshot; under AS OF it is the resolved
106    /// historical xid.
107    fn snapshot(&self) -> &Snapshot;
108
109    /// AS OF xid floor when AS OF was applied for this statement,
110    /// `None` for live reads. Useful for downstream callers that
111    /// want to gate behaviour on historical-read mode without
112    /// re-parsing the query.
113    fn as_of_floor(&self) -> Option<Xid>;
114
115    /// Stable result-cache key for the statement (already mixes
116    /// effective tenant + identity).
117    fn cache_key(&self) -> &str;
118
119    /// Whether the statement is safe to serve from / populate the
120    /// result cache. Combines two underlying signals:
121    ///
122    ///   * the query does not call a volatile builtin (e.g. `NOW()`,
123    ///     `RANDOM()`, `UUID()`), which would change between calls,
124    ///   * the connection is not inside an active transaction with
125    ///     uncommitted writes that other readers shouldn't observe.
126    ///
127    /// SELECT cache callsites (read + write) consult this method
128    /// instead of re-deriving safety from globals or poking the
129    /// frame's private fields. Removing it would force every cache
130    /// callsite to re-run `query_has_volatile_builtin` plus
131    /// `result_cache_safe(conn_id)` inline.
132    fn should_cache_result(&self) -> bool;
133
134    /// Coarse privilege class the statement requires, computed once
135    /// at frame-build time from the SQL prefix. Read/write dispatch
136    /// sites consult this instead of re-classifying the parsed
137    /// `QueryExpr` inline at every callsite.
138    ///
139    /// Removing this method would force every privilege gate to
140    /// recompute the (action, resource) classification from the
141    /// parsed expression and re-check the role hierarchy inline.
142    fn required_privilege(&self) -> Privilege;
143
144    /// Coarse collection-level lock intent the statement implies.
145    /// `None` lets the lock-acquisition path short-circuit without
146    /// touching the lock manager.
147    ///
148    /// Removing this method would force the lock-acquisition path
149    /// to always invoke `intent_lock_modes_for` (which itself walks
150    /// the parsed expression) even for transaction-control / SET /
151    /// SHOW statements that need no collection lock at all.
152    fn lock_intent(&self) -> LockIntent;
153
154    /// Set of collection ids the calling identity is allowed to
155    /// observe under the active `(tenant, role)` scope. Computed once
156    /// at frame-build time via the `AuthStore` visible-collections
157    /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158    /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159    /// before any similarity score is computed (issue #119).
160    ///
161    /// `None` means the frame was built without an auth store wired —
162    /// embedded / single-tenant tests run that way. AI search call
163    /// sites refuse to proceed with `None`, which is the deny-default
164    /// the issue requires; pure SELECT paths fall back to the existing
165    /// per-row RLS gate.
166    fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175    let trimmed = query.trim_start();
176    // Skip a leading line / block comment so the classifier doesn't
177    // misread `/* ... */ SELECT ...` as an unknown statement.
178    let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179        rest.split_once('\n')
180            .map(|(_, r)| r)
181            .unwrap_or("")
182            .trim_start()
183    } else {
184        trimmed
185    };
186    let mut tokens = trimmed.split(|c: char| c.is_whitespace() || c == '(' || c == ';');
187    let first = tokens.next().unwrap_or("");
188    let second = tokens.next().unwrap_or("");
189    if first.eq_ignore_ascii_case("KV") {
190        if second.eq_ignore_ascii_case("GET")
191            || second.eq_ignore_ascii_case("LIST")
192            || second.eq_ignore_ascii_case("WATCH")
193        {
194            return "read";
195        }
196        return "write";
197    }
198    if first.eq_ignore_ascii_case("VAULT") {
199        if second.eq_ignore_ascii_case("LIST")
200            || second.eq_ignore_ascii_case("WATCH")
201            || second.eq_ignore_ascii_case("HISTORY")
202        {
203            return "read";
204        }
205        return "write";
206    }
207    // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
208    let mut buf = [0u8; 16];
209    let bytes = first.as_bytes();
210    let n = bytes.len().min(buf.len());
211    for i in 0..n {
212        buf[i] = bytes[i].to_ascii_uppercase();
213    }
214    match &buf[..n] {
215        b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" | b"RANK"
216        | b"APPROX" | b"APPROXIMATE" | b"ZRANK" | b"ZRANGE" | b"LIST" | b"WATCH" | b"GET"
217        | b"HISTORY" => "read",
218        b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
219        b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
220        b"GRANT" | b"REVOKE" => "admin",
221        b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
222        | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
223        b"PUT" | b"INCR" | b"DECR" | b"ADD" | b"ROTATE" | b"PURGE" | b"UNSEAL" | b"INVALIDATE" => {
224            "write"
225        }
226        _ => "unknown",
227    }
228}
229
230fn classify_privilege(query: &str) -> Privilege {
231    match statement_kind(query) {
232        "read" => Privilege::Read,
233        "write" => Privilege::Write,
234        // DDL is gated at `Role::Write` in the legacy fallback (see
235        // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
236        // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
237        // GRANT / REVOKE upgrade to Admin via finer checks at the call
238        // site — the frame surfaces only the coarse class.
239        "ddl" => Privilege::Write,
240        "admin" => Privilege::Admin,
241        _ => Privilege::None,
242    }
243}
244
245fn classify_lock_intent(query: &str) -> LockIntent {
246    match statement_kind(query) {
247        "read" => LockIntent::Shared,
248        "write" | "ddl" => LockIntent::Exclusive,
249        _ => LockIntent::None,
250    }
251}
252
253pub(super) struct StatementExecutionFrame {
254    tx_local_tenant: Option<Option<String>>,
255    snapshot: Snapshot,
256    own_xids: HashSet<Xid>,
257    cache_key: String,
258    is_volatile_query: bool,
259    cache_safe: bool,
260    /// Effective tenant captured at frame-build time after WITHIN /
261    /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
262    /// so the `ReadFrame` Interface can return a borrow without
263    /// re-touching the thread-local stack.
264    effective_scope: Option<String>,
265    /// Auth identity captured at frame-build time. `None` for
266    /// embedded / anonymous callers.
267    identity: Option<(String, Role)>,
268    /// `Some(xid)` when AS OF resolved to a historical xid; `None`
269    /// for live reads.
270    as_of_floor: Option<Xid>,
271    /// True when the statement snapshot can require tuple versions that
272    /// current secondary indexes no longer contain.
273    requires_index_fallback: bool,
274    /// Privilege class required by the statement, derived from the
275    /// SQL text at frame-build time. Read/write dispatch sites
276    /// consult this instead of re-classifying the parsed expression.
277    required_privilege: Privilege,
278    /// Collection-level lock intent the statement implies. The
279    /// lock-acquisition path short-circuits when this is `None`.
280    lock_intent: LockIntent,
281    /// Set of collection ids the active `(tenant, role)` scope is
282    /// allowed to observe. Computed at frame-build time via the
283    /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
284    /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
285    /// scoring (issue #119). `None` when no auth store is wired
286    /// (embedded test mode) — AI search refuses on `None`.
287    visible_collections: Option<HashSet<String>>,
288    /// Per-owner buffer arena for query-result row chunks (#885). Owned
289    /// by the frame because the frame already owns the query lifecycle;
290    /// lent to the row-streaming path (`execute_runtime_table_query_in`)
291    /// so chunk buffers are reused across the statement's chunk-fetches
292    /// instead of allocated fresh per chunk. Reclaimed when the frame
293    /// drops at statement end — no `thread_local!` scratch, which would
294    /// be unsound under tokio's work-stealing runtime.
295    row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
296}
297
298pub(super) struct StatementFrameGuards {
299    _tx_local_guard: TxLocalTenantGuard,
300    _config_snapshot_guard: ConfigSnapshotGuard,
301    _secret_store_guard: SecretStoreGuard,
302    _snapshot_guard: CurrentSnapshotGuard,
303}
304
305pub(super) struct PreparedStatement {
306    pub(super) expr: QueryExpr,
307    pub(super) mode: QueryMode,
308}
309
310impl StatementExecutionFrame {
311    pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
312        let conn_id = current_connection_id();
313        let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
314        let own_xids = runtime.own_transaction_xids(conn_id);
315        let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
316        let requires_index_fallback =
317            as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
318        let cache_key = result_cache_key(query);
319        let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
320        let cache_safe = runtime.result_cache_safe(conn_id);
321        // Capture identity + effective scope under the same
322        // thread-local view that the cache key was built from, so
323        // the Interface and the cache key agree on what "this
324        // statement" means.
325        let effective_scope = current_tenant();
326        let identity = current_auth_identity();
327
328        // Coarse classification of the statement, computed once from
329        // the SQL prefix so downstream callers don't re-derive it
330        // from the parsed `QueryExpr` at every privilege / lock site.
331        let required_privilege = classify_privilege(query);
332        let lock_intent = classify_lock_intent(query);
333
334        // Issue #119: resolve the visible-collections set for the
335        // active (tenant, role) scope. Only meaningful when an auth
336        // store is wired *and* an identity was captured — embedded
337        // anonymous callers fall back to `None`, and AI search call
338        // sites refuse on `None`.
339        let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
340        {
341            (Some(store), Some((principal, role))) => {
342                let collections = runtime.inner.db.store().list_collections();
343                Some(store.visible_collections_for_scope(
344                    effective_scope.as_deref(),
345                    *role,
346                    principal,
347                    &collections,
348                ))
349            }
350            _ => None,
351        };
352
353        Ok(Self {
354            tx_local_tenant,
355            snapshot,
356            own_xids,
357            cache_key,
358            is_volatile_query,
359            cache_safe,
360            effective_scope,
361            identity,
362            as_of_floor,
363            requires_index_fallback,
364            required_privilege,
365            lock_intent,
366            visible_collections,
367            row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
368        })
369    }
370
371    /// Lend the frame's per-owner row-buffer arena (#885) to the
372    /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
373    /// the owner and the arena is reclaimed when the frame drops at
374    /// statement end.
375    pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
376        Rc::clone(&self.row_arena)
377    }
378
379    pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
380        StatementFrameGuards {
381            _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
382            _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
383            _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
384            _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
385                snapshot: self.snapshot.clone(),
386                manager: Arc::clone(&runtime.inner.snapshot_manager),
387                own_xids: self.own_xids.clone(),
388                requires_index_fallback: self.requires_index_fallback,
389            }),
390        }
391    }
392
393    pub(super) fn cache_key(&self) -> &str {
394        &self.cache_key
395    }
396
397    pub(super) fn can_read_result_cache(&self) -> bool {
398        // Delegates to the `ReadFrame` Interface so the volatile +
399        // active-tx safety decision lives in exactly one place.
400        <Self as ReadFrame>::should_cache_result(self)
401    }
402
403    pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
404        // Cache-safety (volatile builtin, active-tx writes) comes from
405        // the Interface; the rest are write-side payload heuristics
406        // (statement shape, result size) that aren't part of the
407        // safety contract.
408        <Self as ReadFrame>::should_cache_result(self)
409            && result.statement_type == "select"
410            && result.engine != "vault"
411            && result.engine != "runtime-rank"
412            // `QUEUE READ` is a stateful read: a delayed message
413            // (issue #722) becomes deliverable over time without a
414            // producer push to invalidate the cache, so a cached empty
415            // result would hide it. Skip caching entirely.
416            && result.statement != "queue_group_read"
417            && result.result.pre_serialized_json.is_none()
418            // Graph-analytics TVF output (issue #802) is deterministic and
419            // expensive to recompute, so it is cached at any row count. The
420            // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
421            && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
422    }
423
424    pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
425        if self.can_read_result_cache() {
426            runtime.get_result_cache_entry(self.cache_key())
427        } else {
428            None
429        }
430    }
431
432    pub(super) fn write_result_cache(
433        &self,
434        runtime: &RedDBRuntime,
435        result: &RuntimeQueryResult,
436        scopes: HashSet<String>,
437    ) {
438        if self.should_write_result_cache(result) {
439            runtime.put_result_cache_entry(
440                self.cache_key(),
441                RuntimeResultCacheEntry {
442                    result: result.clone(),
443                    cached_at: std::time::Instant::now(),
444                    scopes,
445                },
446            );
447        }
448    }
449
450    pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
451        // Detected via cheap prefix check so non-CTE queries skip the
452        // full parse here. CTE-bearing queries bypass the plan cache
453        // and result cache (rare workload — perf optimization is a
454        // follow-up). Inlining substitutes every CTE reference with
455        // its body as a subquery in FROM, after which the existing
456        // subquery-in-FROM machinery handles execution. Recursive
457        // CTEs are rejected explicitly until fixpoint execution wires
458        // through the runtime.
459        if !has_with_prefix(query) {
460            return Ok(None);
461        }
462        let parsed = crate::storage::query::parser::parse(query)
463            .map_err(|err| RedDBError::Query(err.to_string()))?;
464        if parsed.with_clause.is_some() {
465            let rewritten = crate::storage::query::executors::inline_ctes(parsed)
466                .map_err(|err| RedDBError::Query(err.to_string()))?;
467            return Ok(Some(rewritten));
468        }
469        // No WITH after parse (the prefix matched something else like
470        // `WITHIN` that already routed elsewhere) — fall through to
471        // the normal path with the original query.
472        Ok(None)
473    }
474
475    pub(super) fn prepare_statement(
476        &self,
477        runtime: &RedDBRuntime,
478        query: &str,
479    ) -> RedDBResult<PreparedStatement> {
480        let mode = detect_mode(query);
481        if matches!(mode, QueryMode::Unknown) {
482            return Err(RedDBError::Query("unable to detect query mode".to_string()));
483        }
484
485        // ── Plan cache: reuse only exact-query ASTs ──
486        //
487        // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
488        // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
489        // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
490        // Plan cache applies to statements whose shape can be
491        // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
492        // reuses the same plan across thousands of varying literals).
493        // INSERT is still bypassed — its shape changes per column set
494        // and bulk paths don't go through here anyway.
495        let first_word = query
496            .trim()
497            .split_ascii_whitespace()
498            .next()
499            .unwrap_or("")
500            .to_ascii_uppercase();
501        let is_insert = first_word == "INSERT";
502
503        // Fused normalize+extract: one byte-scan produces both the
504        // cache_key AND the literal bindings. Saves a second Lexer
505        // pass over the query text on every cache hit — dominant
506        // cost on tight UPDATE loops that hit the same shape
507        // thousands of times with varying literals.
508        let (cache_key, prescan_binds) = if is_insert {
509            (String::new(), Vec::new())
510        } else {
511            crate::storage::query::planner::cache_key::normalize_and_extract(query)
512        };
513
514        let expr = if is_insert {
515            // Bypass plan cache for INSERT — shape varies per query.
516            parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
517        } else {
518            // ── Hot path: read lock only (no writer serialization on cache hits) ──
519            //
520            // peek() is a non-mutating probe: no LRU promotion, no touch().
521            // This lets concurrent readers proceed without blocking each other.
522            // On hit we bind literals if needed and return immediately.
523            // Only on miss do we drop to a write lock to parse + insert.
524            let hit = {
525                let plan_cache = runtime.inner.query_cache.read();
526                plan_cache.peek(&cache_key).map(|cached| {
527                    let parameter_count = cached.parameter_count;
528                    let optimized = cached.plan.optimized.clone();
529                    let exact_query = cached.exact_query.clone();
530                    (parameter_count, optimized, exact_query)
531                })
532            };
533
534            if let Some((parameter_count, optimized, exact_query)) = hit {
535                if parameter_count > 0 {
536                    // Shape hit: use the binds extracted during normalise.
537                    let shape_binds = prescan_binds.clone();
538                    if let Some(bound) =
539                        crate::storage::query::planner::shape::bind_parameterized_query(
540                            &optimized,
541                            &shape_binds,
542                            parameter_count,
543                        )
544                    {
545                        bound
546                    } else if exact_query.as_deref() == Some(query) {
547                        // Bind failed but exact query matches — use as-is.
548                        optimized
549                    } else {
550                        // Bind failed and literals differ: re-parse fresh.
551                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
552                    }
553                } else {
554                    // No parameters means either there truly are no literals,
555                    // or this statement type does not participate in shape
556                    // parameterization (for example graph/queue commands).
557                    // Reusing a normalized-cache hit across a different exact
558                    // query can therefore leak stale literals into execution.
559                    if exact_query.as_deref() == Some(query) {
560                        optimized
561                    } else {
562                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
563                    }
564                }
565            } else {
566                // Cache miss — parse, parameterize, store.
567                let parsed =
568                    parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
569                let (cached_expr, parameter_count) = if let Some(prepared) =
570                    crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
571                {
572                    (prepared.shape, prepared.parameter_count)
573                } else {
574                    (parsed.clone(), 0)
575                };
576                {
577                    let mut pc = runtime.inner.query_cache.write();
578                    let plan = crate::storage::query::planner::QueryPlan::new(
579                        parsed.clone(),
580                        cached_expr,
581                        Default::default(),
582                    );
583                    pc.insert(
584                        cache_key.clone(),
585                        crate::storage::query::planner::CachedPlan::new(plan)
586                            .with_shape_key(cache_key.clone())
587                            .with_exact_query(query.to_string())
588                            .with_parameter_count(parameter_count),
589                    );
590                }
591                parsed
592            }
593        };
594
595        // Phase 5 PG parity: substitute any registered view name that
596        // appears in the expression with its stored body. Runs after
597        // parse and before dispatch so the SQL entrypoint gets the
598        // same view resolution `execute_query_expr` already does.
599        let expr = runtime.rewrite_view_refs(expr);
600
601        Ok(PreparedStatement { expr, mode })
602    }
603
604    pub(super) fn check_query_privilege(
605        &self,
606        runtime: &RedDBRuntime,
607        expr: &QueryExpr,
608    ) -> RedDBResult<()> {
609        // Frame-level coarse gate. We consult `required_privilege()`
610        // (computed once at frame-build) against the captured identity
611        // before the deep grant engine walks the parsed expression.
612        // The coarse gate cannot ALLOW anything the grant engine would
613        // deny — it only short-circuits the obvious "Role::Read tries
614        // INSERT" case so a downstream caller never has to redo this
615        // check inline. `Privilege::None` (transaction control / SET /
616        // SHOW) flows through unchanged; the grant engine treats those
617        // as bypass too.
618        if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
619            let needed = <Self as ReadFrame>::required_privilege(self);
620            if !needed.is_satisfied_by(role) {
621                // Issue #205 — when the deep grant engine *also*
622                // denies, we treat this as an ordinary permission
623                // failure. But when an Admin-only statement reaches
624                // this gate without an auth_store wired (so the deep
625                // engine can't double-check), the coarse rejection is
626                // the only line of defence — emit an OperatorEvent so
627                // the operator notices an Admin-class statement was
628                // attempted with insufficient role.
629                if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
630                    crate::telemetry::operator_event::OperatorEvent::AuthBypass {
631                        principal: username.to_string(),
632                        resource: format!("statement requiring {needed:?}"),
633                        detail: format!(
634                            "auth_store not wired; coarse gate is sole defence (role={role:?})"
635                        ),
636                    }
637                    .emit_global();
638                }
639                return Err(RedDBError::Query(format!(
640                    "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
641                )));
642            }
643        }
644        runtime
645            .check_query_privilege(expr)
646            .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
647    }
648
649    pub(super) fn prepare_dispatch(
650        &self,
651        runtime: &RedDBRuntime,
652        expr: &QueryExpr,
653    ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
654        runtime.validate_model_operations_before_auth(expr)?;
655        self.check_query_privilege(runtime, expr)?;
656        Ok(self.acquire_intent_locks(runtime, expr))
657    }
658
659    pub(super) fn acquire_intent_locks(
660        &self,
661        runtime: &RedDBRuntime,
662        expr: &QueryExpr,
663    ) -> Option<crate::runtime::locking::LockerGuard> {
664        if !runtime.config_bool("concurrency.locking.enabled", true) {
665            return None;
666        }
667        // Frame-level short-circuit: if the statement carries no lock
668        // intent (transaction control, SET, SHOW), skip the lock
669        // manager entirely instead of letting `intent_lock_modes_for`
670        // walk the parsed expression to reach the same conclusion.
671        if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
672            return None;
673        }
674        intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
675            let mut guard =
676                crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
677            let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
678            for collection in collections_referenced(expr) {
679                let _ = guard.acquire(
680                    crate::runtime::locking::Resource::Collection(collection),
681                    coll_mode,
682                );
683            }
684            guard
685        })
686    }
687}
688
689impl ReadFrame for StatementExecutionFrame {
690    fn effective_scope(&self) -> Option<&str> {
691        self.effective_scope.as_deref()
692    }
693
694    fn identity(&self) -> Option<(&str, Role)> {
695        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
696    }
697
698    fn snapshot(&self) -> &Snapshot {
699        &self.snapshot
700    }
701
702    fn as_of_floor(&self) -> Option<Xid> {
703        self.as_of_floor
704    }
705
706    fn cache_key(&self) -> &str {
707        &self.cache_key
708    }
709
710    fn should_cache_result(&self) -> bool {
711        !self.is_volatile_query && self.cache_safe
712    }
713
714    fn required_privilege(&self) -> Privilege {
715        self.required_privilege
716    }
717
718    fn lock_intent(&self) -> LockIntent {
719        self.lock_intent
720    }
721
722    fn visible_collections(&self) -> Option<&HashSet<String>> {
723        self.visible_collections.as_ref()
724    }
725}
726
727/// Lightweight `ReadFrame` carrier used by AI command entry points
728/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
729///
730/// Issue #119 calls this struct `EffectiveScope`. It bundles the
731/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
732/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
733/// instead of re-reading thread-locals at every call site.
734///
735/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
736/// from the per-statement thread-locals (identical to how
737/// `StatementExecutionFrame::build` derives them) and resolves
738/// `visible_collections` via the `AuthStore` cache.
739pub struct EffectiveScope {
740    pub(crate) tenant: Option<String>,
741    pub(crate) identity: Option<(String, Role)>,
742    pub(crate) snapshot: Snapshot,
743    pub(crate) visible_collections: Option<HashSet<String>>,
744}
745
746impl EffectiveScope {
747    /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
748    /// to gate LLM-backed NER calls behind `ai:ner:read`.
749    ///
750    /// Placeholder for now: always returns `false`. The auth engine's
751    /// capability matrix is future work; until it lands, every routed
752    /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
753    /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
754    /// Documented in code so the wire-up is a one-line change once
755    /// the auth engine learns capabilities.
756    pub fn has_capability(&self, _capability: &str) -> bool {
757        false
758    }
759}
760
761impl ReadFrame for EffectiveScope {
762    fn effective_scope(&self) -> Option<&str> {
763        self.tenant.as_deref()
764    }
765    fn identity(&self) -> Option<(&str, Role)> {
766        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
767    }
768    fn snapshot(&self) -> &Snapshot {
769        &self.snapshot
770    }
771    fn as_of_floor(&self) -> Option<Xid> {
772        None
773    }
774    fn cache_key(&self) -> &str {
775        ""
776    }
777    fn should_cache_result(&self) -> bool {
778        false
779    }
780    fn required_privilege(&self) -> Privilege {
781        Privilege::Read
782    }
783    fn lock_intent(&self) -> LockIntent {
784        LockIntent::Shared
785    }
786    fn visible_collections(&self) -> Option<&HashSet<String>> {
787        self.visible_collections.as_ref()
788    }
789}
790
791impl RedDBRuntime {
792    /// Build the AI command `EffectiveScope` from the current
793    /// statement thread-locals + auth store.
794    ///
795    /// Returns `None` for embedded callers (no auth store, no
796    /// identity) — `AuthorizedSearch` treats `None` as deny-default.
797    pub(crate) fn ai_scope(&self) -> EffectiveScope {
798        let tenant = super::impl_core::current_tenant();
799        let identity = super::impl_core::current_auth_identity();
800        let snapshot = self.current_snapshot();
801        let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
802            (Some(store), Some((principal, role))) => {
803                let collections = self.inner.db.store().list_collections();
804                Some(store.visible_collections_for_scope(
805                    tenant.as_deref(),
806                    *role,
807                    principal,
808                    &collections,
809                ))
810            }
811            _ => None,
812        };
813        EffectiveScope {
814            tenant,
815            identity,
816            snapshot,
817            visible_collections,
818        }
819    }
820}
821
822/// Test fixtures for callers that need to drive `ReadFrame` without
823/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
824/// only leaks across module boundaries inside the crate.
825#[cfg(test)]
826pub(crate) mod test_support {
827    use super::{LockIntent, Privilege, ReadFrame};
828    use crate::auth::Role;
829    use crate::storage::transaction::snapshot::{Snapshot, Xid};
830    use std::collections::HashSet;
831
832    /// A `ReadFrame` impl with hand-set fields. Used by
833    /// `authorized_search` tests to assert the deny-default and
834    /// scope-trim behaviour without going through frame construction.
835    pub(crate) struct FakeReadFrame {
836        pub tenant: Option<String>,
837        pub identity: Option<(String, Role)>,
838        pub snapshot: Snapshot,
839        pub visible: Option<HashSet<String>>,
840    }
841
842    impl FakeReadFrame {
843        pub(crate) fn without_scope() -> Self {
844            Self {
845                tenant: None,
846                identity: None,
847                snapshot: Snapshot {
848                    xid: 0,
849                    in_progress: HashSet::new(),
850                },
851                visible: None,
852            }
853        }
854
855        pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
856            Self {
857                tenant: Some("acme".to_string()),
858                identity: Some(("alice".to_string(), Role::Read)),
859                snapshot: Snapshot {
860                    xid: 0,
861                    in_progress: HashSet::new(),
862                },
863                visible: Some(visible),
864            }
865        }
866    }
867
868    impl ReadFrame for FakeReadFrame {
869        fn effective_scope(&self) -> Option<&str> {
870            self.tenant.as_deref()
871        }
872        fn identity(&self) -> Option<(&str, Role)> {
873            self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
874        }
875        fn snapshot(&self) -> &Snapshot {
876            &self.snapshot
877        }
878        fn as_of_floor(&self) -> Option<Xid> {
879            None
880        }
881        fn cache_key(&self) -> &str {
882            ""
883        }
884        fn should_cache_result(&self) -> bool {
885            false
886        }
887        fn required_privilege(&self) -> Privilege {
888            Privilege::Read
889        }
890        fn lock_intent(&self) -> LockIntent {
891            LockIntent::Shared
892        }
893        fn visible_collections(&self) -> Option<&HashSet<String>> {
894            self.visible.as_ref()
895        }
896    }
897}
898
899impl RedDBRuntime {
900    fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
901        let mut set = HashSet::new();
902        if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
903            set.insert(ctx.xid);
904            for (_, sub) in &ctx.savepoints {
905                set.insert(*sub);
906            }
907            for sub in &ctx.released_sub_xids {
908                set.insert(*sub);
909            }
910        }
911        set
912    }
913
914    /// Resolve the snapshot for the current statement, returning
915    /// the snapshot itself and (when AS OF is in effect) the
916    /// resolved xid floor. The floor is the same xid carried inside
917    /// `Snapshot.xid` for AS OF reads — exposing it separately lets
918    /// the `ReadFrame` Interface tell "live read" from "historical
919    /// read" without inferring from `in_progress.is_empty()`.
920    fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
921        match peek_top_level_as_of_with_table(query) {
922            Some((spec, Some(table))) => {
923                if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
924                    return Err(RedDBError::InvalidConfig(format!(
925                        "AS OF requires a versioned collection — \
926                         `{table}` has not opted in. \
927                         Call vcs.set_versioned(\"{table}\", true) first."
928                    )));
929                }
930                let xid = self.vcs_resolve_as_of(spec)?;
931                Ok((
932                    Snapshot {
933                        xid,
934                        in_progress: HashSet::new(),
935                    },
936                    Some(xid),
937                ))
938            }
939            Some((spec, None)) => {
940                let xid = self.vcs_resolve_as_of(spec)?;
941                Ok((
942                    Snapshot {
943                        xid,
944                        in_progress: HashSet::new(),
945                    },
946                    Some(xid),
947                ))
948            }
949            None => Ok((self.current_snapshot(), None)),
950        }
951    }
952
953    fn result_cache_safe(&self, conn_id: u64) -> bool {
954        let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
955        let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
956        !has_active_xids && !in_own_tx
957    }
958}
959
960/// Whether a result's `engine` tag is one of the graph-analytics TVF
961/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
962/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
963/// algorithm output that is cached regardless of row count.
964fn is_graph_tvf_engine(engine: &str) -> bool {
965    matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
966}
967
968fn result_cache_key(query: &str) -> String {
969    let tenant = current_tenant().unwrap_or_default();
970    let auth = current_auth_identity()
971        .map(|(user, role)| format!("{}|{:?}", user, role))
972        .unwrap_or_default();
973    if tenant.is_empty() && auth.is_empty() {
974        query.to_string()
975    } else {
976        format!("{query}\u{001e}{tenant}\u{001e}{auth}")
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use crate::api::RedDBOptions;
984    use crate::runtime::impl_core::{
985        clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
986        set_current_tenant,
987    };
988    use crate::runtime::RedDBRuntime;
989
990    fn fresh_runtime() -> RedDBRuntime {
991        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
992    }
993
994    /// Ensure thread-local state from a prior test can't leak into
995    /// the next one — tests in the same binary share the thread.
996    fn reset_thread_locals() {
997        clear_current_tenant();
998        clear_current_auth_identity();
999    }
1000
1001    #[test]
1002    fn autocommit_select_takes_live_snapshot() {
1003        reset_thread_locals();
1004        let rt = fresh_runtime();
1005        let frame =
1006            StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
1007
1008        // Live reads: no AS OF floor, snapshot bounded by the
1009        // manager's `peek_next_xid` so committed tuples are visible.
1010        let f: &dyn ReadFrame = &frame;
1011        assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
1012        assert!(
1013            f.snapshot().xid >= 1,
1014            "autocommit snapshot xid is bounded by peek_next_xid"
1015        );
1016    }
1017
1018    #[test]
1019    fn frame_captures_identity_and_scope() {
1020        reset_thread_locals();
1021        set_current_tenant("acme".to_string());
1022        set_current_auth_identity("alice".to_string(), Role::Write);
1023
1024        let rt = fresh_runtime();
1025        let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1026        let f: &dyn ReadFrame = &frame;
1027
1028        assert_eq!(f.effective_scope(), Some("acme"));
1029        let id = f.identity().expect("identity captured");
1030        assert_eq!(id.0, "alice");
1031        assert!(matches!(id.1, Role::Write));
1032
1033        // Cache key mixes scope + identity so two callers under
1034        // different tenants never share a cache slot.
1035        assert!(
1036            f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1037            "cache key folds in scope + identity, got {:?}",
1038            f.cache_key()
1039        );
1040
1041        reset_thread_locals();
1042    }
1043
1044    #[test]
1045    fn as_of_rejects_non_versioned_user_collection() {
1046        reset_thread_locals();
1047        let rt = fresh_runtime();
1048
1049        // `not_versioned` is a plain user collection — the frame
1050        // builder must reject AS OF until the caller opts in via
1051        // `vcs.set_versioned`.
1052        let err = match StatementExecutionFrame::build(
1053            &rt,
1054            "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1055        ) {
1056            Err(e) => e,
1057            Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1058        };
1059
1060        let msg = format!("{err}");
1061        assert!(
1062            msg.contains("AS OF requires a versioned collection"),
1063            "expected AS OF rejection, got: {msg}"
1064        );
1065    }
1066
1067    /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1068    ///
1069    /// Sets a tenant + identity via the public thread-local API the
1070    /// runtime uses for ambient scope, drives a real `SELECT` through
1071    /// `execute_query`, then inspects the result cache that the SELECT
1072    /// path populates via `frame.cache_key()`. The key only carries
1073    /// the tenant + identity *because* it was built through the frame —
1074    /// reverting the wiring to inline `current_tenant()` /
1075    /// `current_auth_identity()` reads would still pass this test, but
1076    /// dropping the frame entirely (so the SELECT path stopped touching
1077    /// `cache_key`) would break it.
1078    #[test]
1079    fn select_path_routes_through_frame_cache_key() {
1080        reset_thread_locals();
1081        set_current_tenant("acme".to_string());
1082        set_current_auth_identity("alice".to_string(), Role::Read);
1083
1084        let rt = fresh_runtime();
1085        let result = rt
1086            .execute_query("SELECT 1")
1087            .expect("SELECT 1 executes under tenant=acme/identity=alice");
1088        assert_eq!(result.statement_type, "select");
1089
1090        // The textual SELECT path builds a frame and
1091        // writes its result through `frame.cache_key()`. That key folds
1092        // tenant + identity in via `result_cache_key`, so finding "acme"
1093        // and "alice" inside any cached key proves the frame was the
1094        // seam used.
1095        let cache = rt.inner.result_cache.read();
1096        let any_keyed_with_scope = cache
1097            .0
1098            .keys()
1099            .any(|k| k.contains("acme") && k.contains("alice"));
1100        assert!(
1101            any_keyed_with_scope,
1102            "expected at least one result-cache key carrying tenant+identity, \
1103             got keys: {:?}",
1104            cache.0.keys().collect::<Vec<_>>()
1105        );
1106
1107        reset_thread_locals();
1108    }
1109
1110    /// A SELECT that calls a volatile builtin (here:
1111    /// `pg_advisory_unlock`, the volatile token the runtime currently
1112    /// recognises in `query_has_volatile_builtin`) must NOT populate
1113    /// the result cache. Any caller hitting the cache after this would
1114    /// see a stale answer for an inherently-volatile query, so the
1115    /// SELECT path gates writes through `frame.should_cache_result()`.
1116    ///
1117    /// Deletion test: removing `ReadFrame::should_cache_result`, or
1118    /// reverting the SELECT path to skip its safety gate, would let
1119    /// the result cache silently absorb this statement and break the
1120    /// assertion below.
1121    #[test]
1122    fn volatile_select_does_not_populate_result_cache() {
1123        reset_thread_locals();
1124        let rt = fresh_runtime();
1125
1126        // Frame-level invariant: the volatile-builtin signal collapses
1127        // `should_cache_result` to false even for an autocommit /
1128        // out-of-tx connection.
1129        let frame =
1130            StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1131        let f: &dyn ReadFrame = &frame;
1132        assert!(
1133            !f.should_cache_result(),
1134            "volatile builtin must disable result-cache safety"
1135        );
1136
1137        // End-to-end: drive the volatile SELECT through `execute_query`
1138        // and confirm no entry was stamped under its cache key. Other
1139        // entries from prior tests sharing the binary may exist, so we
1140        // assert specifically on this query's key.
1141        let _ = rt
1142            .execute_query("SELECT pg_advisory_unlock(1)")
1143            .expect("volatile SELECT executes");
1144        let cache = rt.inner.result_cache.read();
1145        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1146        assert!(
1147            !cache.0.contains_key(&key),
1148            "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1149            cache.0.keys().collect::<Vec<_>>()
1150        );
1151
1152        reset_thread_locals();
1153    }
1154
1155    #[test]
1156    fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1157        reset_thread_locals();
1158        let rt = fresh_runtime();
1159        rt.inner
1160            .db
1161            .store()
1162            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1163
1164        let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1165        assert_eq!(result.statement_type, "select");
1166
1167        let key = result_cache_key("SELECT 1");
1168        assert!(
1169            rt.inner
1170                .result_blob_cache
1171                .get("runtime.result_cache", &key)
1172                .is_some(),
1173            "blob backend should stamp the Blob Cache path"
1174        );
1175        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1176        assert!(
1177            !rt.inner.result_cache.read().0.contains_key(&key),
1178            "blob backend should not write the legacy map"
1179        );
1180    }
1181
1182    #[test]
1183    fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1184        reset_thread_locals();
1185        let rt = fresh_runtime();
1186        rt.inner
1187            .db
1188            .store()
1189            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1190
1191        let _ = rt
1192            .execute_query("SELECT pg_advisory_unlock(1)")
1193            .expect("volatile SELECT executes");
1194        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1195        assert!(
1196            rt.inner
1197                .result_blob_cache
1198                .get("runtime.result_cache", &key)
1199                .is_none(),
1200            "volatile SELECT must not populate blob result cache"
1201        );
1202        assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1203    }
1204
1205    #[test]
1206    fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1207        reset_thread_locals();
1208        let rt = fresh_runtime();
1209        rt.inner
1210            .db
1211            .store()
1212            .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1213
1214        let first = rt.execute_query("SELECT 1").expect("first SELECT");
1215        let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1216        assert_eq!(first.result.len(), second.result.len());
1217
1218        let key = result_cache_key("SELECT 1");
1219        assert!(rt.inner.result_cache.read().0.contains_key(&key));
1220        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1221        assert_eq!(rt.result_cache_shadow_divergences(), 0);
1222        assert_eq!(
1223            crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1224            "cache_shadow_divergence_total"
1225        );
1226    }
1227
1228    #[test]
1229    fn as_of_on_red_collection_records_floor() {
1230        reset_thread_locals();
1231        let rt = fresh_runtime();
1232
1233        // `red_*` collections always allow AS OF. The frame should
1234        // resolve to a concrete xid and surface it via the Interface.
1235        let frame =
1236            StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1237                .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1238
1239        let f: &dyn ReadFrame = &frame;
1240        assert_eq!(
1241            f.as_of_floor(),
1242            Some(1),
1243            "AS OF SNAPSHOT 1 records xid=1 as the floor"
1244        );
1245        assert_eq!(f.snapshot().xid, 1);
1246        assert!(
1247            f.snapshot().in_progress.is_empty(),
1248            "historical reads have no in-progress set"
1249        );
1250    }
1251
1252    /// The frame classifies common SQL prefixes into the coarse
1253    /// `Privilege` / `LockIntent` buckets at build time. This test
1254    /// pins the mapping so a regression that silently re-routes
1255    /// (e.g. INSERT classified as Read) surfaces here, not at a
1256    /// downstream privilege gate.
1257    #[test]
1258    fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1259        reset_thread_locals();
1260        let rt = fresh_runtime();
1261
1262        let cases = [
1263            ("SELECT 1", Privilege::Read, LockIntent::Shared),
1264            ("LIST KV settings", Privilege::Read, LockIntent::Shared),
1265            (
1266                "KV GET settings.feature",
1267                Privilege::Read,
1268                LockIntent::Shared,
1269            ),
1270            ("VAULT LIST secrets", Privilege::Read, LockIntent::Shared),
1271            (
1272                "INSERT INTO t (id) VALUES (1)",
1273                Privilege::Write,
1274                LockIntent::Exclusive,
1275            ),
1276            (
1277                "KV PUT settings.feature = 'on'",
1278                Privilege::Write,
1279                LockIntent::Exclusive,
1280            ),
1281            (
1282                "VAULT PUT secrets.api = 'x'",
1283                Privilege::Write,
1284                LockIntent::Exclusive,
1285            ),
1286            (
1287                "UPDATE t SET x = 1 WHERE id = 1",
1288                Privilege::Write,
1289                LockIntent::Exclusive,
1290            ),
1291            (
1292                "DELETE FROM t WHERE id = 1",
1293                Privilege::Write,
1294                LockIntent::Exclusive,
1295            ),
1296            (
1297                "CREATE TABLE foo (id INT)",
1298                Privilege::Write,
1299                LockIntent::Exclusive,
1300            ),
1301            ("BEGIN", Privilege::None, LockIntent::None),
1302            ("COMMIT", Privilege::None, LockIntent::None),
1303            ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1304        ];
1305
1306        for (q, want_priv, want_lock) in cases {
1307            let frame = StatementExecutionFrame::build(&rt, q)
1308                .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1309            let f: &dyn ReadFrame = &frame;
1310            assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1311            assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1312        }
1313    }
1314
1315    /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1316    /// driven through `execute_query` under an identity whose role
1317    /// doesn't satisfy the frame's coarse `Read` privilege gets
1318    /// denied with the frame's signal.
1319    ///
1320    /// We test the gate by classifying an INSERT (which the frame
1321    /// reports as `Privilege::Write`) under `Role::Read` — the only
1322    /// pair the legacy fallback would also reject, but here the
1323    /// rejection comes through `frame.check_query_privilege` BEFORE
1324    /// the parsed-expression walker runs. Removing
1325    /// `required_privilege` (or the `is_satisfied_by` consult inside
1326    /// `check_query_privilege`) would force the deny path back to the
1327    /// inline `RedDBRuntime::check_query_privilege` walker — but the
1328    /// auth_store gate up there is bypassed when no auth_store is
1329    /// wired (embedded test mode), so this test would FLIP from
1330    /// denied to permitted and break the assertion below.
1331    #[test]
1332    fn insert_under_read_role_denied_via_frame_privilege() {
1333        reset_thread_locals();
1334        set_current_auth_identity("alice".to_string(), Role::Read);
1335
1336        let rt = fresh_runtime();
1337        // Bypass parser by reaching into the frame directly: the
1338        // frame derives privilege from the SQL prefix without
1339        // needing an auth_store wired up. Driving end-to-end via
1340        // `execute_query` would also reject (no table `t`), but for
1341        // a different reason — we want to pin the privilege seam.
1342        let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1343            .expect("frame builds for INSERT");
1344        let f: &dyn ReadFrame = &frame;
1345        assert_eq!(
1346            f.required_privilege(),
1347            Privilege::Write,
1348            "INSERT classified as Write"
1349        );
1350        let id = f.identity().expect("identity captured");
1351        assert!(
1352            !f.required_privilege().is_satisfied_by(id.1),
1353            "Role::Read does not satisfy Privilege::Write — frame must deny"
1354        );
1355
1356        // End-to-end: the frame's `check_query_privilege` sees the
1357        // (Read role, Write privilege) mismatch and denies before
1358        // dispatch. We drive a synthetic `QueryExpr::Table` because
1359        // the SELECT/INSERT parser would happen to also fail, and we
1360        // want the failure to come from the privilege seam.
1361        use crate::storage::query::ast::{QueryExpr, TableQuery};
1362        let expr = QueryExpr::Table(TableQuery::new("t"));
1363        let err = frame
1364            .check_query_privilege(&rt, &expr)
1365            .expect_err("denied via frame's coarse privilege gate");
1366        let msg = format!("{err}");
1367        assert!(
1368            msg.contains("permission denied") && msg.contains("Write"),
1369            "expected frame-level Write deny, got: {msg}"
1370        );
1371
1372        reset_thread_locals();
1373    }
1374
1375    /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1376    /// wired into the SELECT path and produces observable results
1377    /// byte-identical to the per-request-allocation baseline.
1378    ///
1379    /// A table with more rows than the streaming high-water mark
1380    /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1381    /// path to assemble many chunks, each leasing/recycling the frame
1382    /// arena's single chunk buffer. Driving it through `execute_query`
1383    /// (which builds a `StatementExecutionFrame` and lends its arena)
1384    /// must return every inserted row, in order — exactly what the
1385    /// allocate-per-chunk path returned. A bug in the arena wiring
1386    /// (dropped rows, bled rows, mis-ordering) would surface here.
1387    #[test]
1388    fn large_select_through_frame_arena_returns_all_rows_in_order() {
1389        reset_thread_locals();
1390        let rt = fresh_runtime();
1391        rt.execute_query("CREATE TABLE big (id INT)")
1392            .expect("create table");
1393
1394        // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1395        // spans multiple chunks and the arena buffer is reused.
1396        const N: usize = 2_500;
1397        for start in (0..N).step_by(250) {
1398            let end = (start + 250).min(N);
1399            let values = (start..end)
1400                .map(|i| format!("({i})"))
1401                .collect::<Vec<_>>()
1402                .join(", ");
1403            rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1404                .unwrap_or_else(|err| panic!("insert rows {start}..{end}: {err:?}"));
1405        }
1406
1407        let result = rt
1408            .execute_query("SELECT id FROM big ORDER BY id")
1409            .expect("large SELECT executes through the frame arena path");
1410        assert_eq!(result.statement_type, "select");
1411        assert_eq!(
1412            result.result.records.len(),
1413            N,
1414            "every inserted row streams back through the arena-backed channel"
1415        );
1416        for (i, record) in result.result.records.iter().enumerate() {
1417            assert_eq!(
1418                record.get("id"),
1419                Some(&crate::storage::schema::Value::Integer(i as i64)),
1420                "row {i} is byte-identical to the per-request-allocation baseline"
1421            );
1422        }
1423
1424        reset_thread_locals();
1425    }
1426
1427    /// Transport adapters may decode their wire-specific parameter value
1428    /// shapes, but SQL parsing/binding must stay behind the runtime's
1429    /// statement entrypoint. This pins the deeper seam introduced for
1430    /// parameterized query execution: HTTP, JSON-RPC, RedWire, PG wire,
1431    /// and gRPC all call `RedDBRuntime::execute_query_with_params`, which
1432    /// installs a real `StatementExecutionFrame` before dispatch.
1433    #[test]
1434    fn parameterized_transport_adapters_delegate_binding_to_runtime() {
1435        let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
1436        let adapters = [
1437            "src/server/handlers_query.rs",
1438            "src/rpc_stdio.rs",
1439            "src/wire/redwire/session.rs",
1440            "src/wire/postgres/server.rs",
1441            "src/grpc.rs",
1442        ];
1443
1444        for relative in adapters {
1445            let path = manifest_dir.join(relative);
1446            let text = std::fs::read_to_string(&path)
1447                .unwrap_or_else(|err| panic!("read {}: {err}", path.display()));
1448            assert!(
1449                text.contains("execute_query_with_params"),
1450                "{relative} should delegate parameterized query execution to the runtime"
1451            );
1452            assert!(
1453                !text.contains("user_params::bind"),
1454                "{relative} must not bind SQL params in the transport adapter"
1455            );
1456        }
1457    }
1458
1459    /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1460    /// control statement carries `LockIntent::None` and the
1461    /// `acquire_intent_locks` path returns `None` without consulting
1462    /// `intent_lock_modes_for`. Removing the method (or its consult
1463    /// site in `acquire_intent_locks`) would force the lock-mode
1464    /// helper to walk a fabricated parsed expression to reach the
1465    /// same conclusion — but the assertion that no guard is allocated
1466    /// for a `BEGIN` frame would still hold, so we additionally pin
1467    /// the classifier mapping above to make the deletion observable.
1468    #[test]
1469    fn control_statement_skips_intent_locks_via_frame() {
1470        reset_thread_locals();
1471        let rt = fresh_runtime();
1472
1473        let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1474        let f: &dyn ReadFrame = &frame;
1475        assert_eq!(f.lock_intent(), LockIntent::None);
1476
1477        // Drive `acquire_intent_locks` against a fabricated SELECT
1478        // expression that WOULD normally yield `(IS, IS)`; the frame's
1479        // `lock_intent() == None` short-circuit must still suppress
1480        // the guard.
1481        use crate::storage::query::ast::{QueryExpr, TableQuery};
1482        let expr = QueryExpr::Table(TableQuery::new("t"));
1483        let guard = frame.acquire_intent_locks(&rt, &expr);
1484        assert!(
1485            guard.is_none(),
1486            "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1487        );
1488    }
1489}