Skip to main content

reddb_server/runtime/
statement_frame.rs

1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7    collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8    has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9    query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10    SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30    /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31    /// any role from `Role::Read` upward.
32    Read,
33    /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34    /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35    /// at least `Role::Write`.
36    Write,
37    /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38    /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39    Admin,
40    /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41    /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42    /// for any authenticated identity.
43    None,
44}
45
46impl Privilege {
47    /// `true` iff `role` is sufficient to execute a statement carrying
48    /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49    /// Admin` containment used by the auth fallback path.
50    pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51        match self {
52            Self::None => true,
53            Self::Read => role.can_read(),
54            Self::Write => role.can_write(),
55            Self::Admin => role.can_admin(),
56        }
57    }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67    /// No collection-level lock needed (transaction control, SET,
68    /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69    None,
70    /// Reader-style intent: SELECT, joins, graph / queue / search
71    /// reads. Maps to `(IS, IS)` at the storage layer.
72    Shared,
73    /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74    /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75    /// `Exclusive` at this granularity — call sites that need the
76    /// finer distinction still consult `intent_lock_modes_for`.
77    Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93    /// Effective tenant scope for the statement after WITHIN /
94    /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95    /// "no tenant bound" (RLS deny-default applies).
96    fn effective_scope(&self) -> Option<&str>;
97
98    /// Authenticated identity observed at frame-build time, if any.
99    /// Returns `(username, role)` so callers can render audit lines
100    /// or feed RLS policy lookups without re-reading thread-locals.
101    fn identity(&self) -> Option<(&str, Role)>;
102
103    /// MVCC snapshot the statement reads against. For autocommit
104    /// this is a fresh snapshot; inside an active transaction it
105    /// is the txn's snapshot; under AS OF it is the resolved
106    /// historical xid.
107    fn snapshot(&self) -> &Snapshot;
108
109    /// AS OF xid floor when AS OF was applied for this statement,
110    /// `None` for live reads. Useful for downstream callers that
111    /// want to gate behaviour on historical-read mode without
112    /// re-parsing the query.
113    fn as_of_floor(&self) -> Option<Xid>;
114
115    /// Stable result-cache key for the statement (already mixes
116    /// effective tenant + identity).
117    fn cache_key(&self) -> &str;
118
119    /// Whether the statement is safe to serve from / populate the
120    /// result cache. Combines two underlying signals:
121    ///
122    ///   * the query does not call a volatile builtin (e.g. `NOW()`,
123    ///     `RANDOM()`, `UUID()`), which would change between calls,
124    ///   * the connection is not inside an active transaction with
125    ///     uncommitted writes that other readers shouldn't observe.
126    ///
127    /// SELECT cache callsites (read + write) consult this method
128    /// instead of re-deriving safety from globals or poking the
129    /// frame's private fields. Removing it would force every cache
130    /// callsite to re-run `query_has_volatile_builtin` plus
131    /// `result_cache_safe(conn_id)` inline.
132    fn should_cache_result(&self) -> bool;
133
134    /// Coarse privilege class the statement requires, computed once
135    /// at frame-build time from the SQL prefix. Read/write dispatch
136    /// sites consult this instead of re-classifying the parsed
137    /// `QueryExpr` inline at every callsite.
138    ///
139    /// Removing this method would force every privilege gate to
140    /// recompute the (action, resource) classification from the
141    /// parsed expression and re-check the role hierarchy inline.
142    fn required_privilege(&self) -> Privilege;
143
144    /// Coarse collection-level lock intent the statement implies.
145    /// `None` lets the lock-acquisition path short-circuit without
146    /// touching the lock manager.
147    ///
148    /// Removing this method would force the lock-acquisition path
149    /// to always invoke `intent_lock_modes_for` (which itself walks
150    /// the parsed expression) even for transaction-control / SET /
151    /// SHOW statements that need no collection lock at all.
152    fn lock_intent(&self) -> LockIntent;
153
154    /// Set of collection ids the calling identity is allowed to
155    /// observe under the active `(tenant, role)` scope. Computed once
156    /// at frame-build time via the `AuthStore` visible-collections
157    /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158    /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159    /// before any similarity score is computed (issue #119).
160    ///
161    /// `None` means the frame was built without an auth store wired —
162    /// embedded / single-tenant tests run that way. AI search call
163    /// sites refuse to proceed with `None`, which is the deny-default
164    /// the issue requires; pure SELECT paths fall back to the existing
165    /// per-row RLS gate.
166    fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175    let trimmed = query.trim_start();
176    // Skip a leading line / block comment so the classifier doesn't
177    // misread `/* ... */ SELECT ...` as an unknown statement.
178    let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179        rest.split_once('\n')
180            .map(|(_, r)| r)
181            .unwrap_or("")
182            .trim_start()
183    } else {
184        trimmed
185    };
186    let first = trimmed
187        .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
188        .next()
189        .unwrap_or("");
190    // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
191    let mut buf = [0u8; 16];
192    let bytes = first.as_bytes();
193    let n = bytes.len().min(buf.len());
194    for i in 0..n {
195        buf[i] = bytes[i].to_ascii_uppercase();
196    }
197    match &buf[..n] {
198        b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" | b"RANK"
199        | b"APPROX" | b"APPROXIMATE" | b"ZRANK" | b"ZRANGE" => "read",
200        b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
201        b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
202        b"GRANT" | b"REVOKE" => "admin",
203        b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
204        | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
205        _ => "unknown",
206    }
207}
208
209fn classify_privilege(query: &str) -> Privilege {
210    match statement_kind(query) {
211        "read" => Privilege::Read,
212        "write" => Privilege::Write,
213        // DDL is gated at `Role::Write` in the legacy fallback (see
214        // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
215        // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
216        // GRANT / REVOKE upgrade to Admin via finer checks at the call
217        // site — the frame surfaces only the coarse class.
218        "ddl" => Privilege::Write,
219        "admin" => Privilege::Admin,
220        _ => Privilege::None,
221    }
222}
223
224fn classify_lock_intent(query: &str) -> LockIntent {
225    match statement_kind(query) {
226        "read" => LockIntent::Shared,
227        "write" | "ddl" => LockIntent::Exclusive,
228        _ => LockIntent::None,
229    }
230}
231
232pub(super) struct StatementExecutionFrame {
233    tx_local_tenant: Option<Option<String>>,
234    snapshot: Snapshot,
235    own_xids: HashSet<Xid>,
236    cache_key: String,
237    is_volatile_query: bool,
238    cache_safe: bool,
239    /// Effective tenant captured at frame-build time after WITHIN /
240    /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
241    /// so the `ReadFrame` Interface can return a borrow without
242    /// re-touching the thread-local stack.
243    effective_scope: Option<String>,
244    /// Auth identity captured at frame-build time. `None` for
245    /// embedded / anonymous callers.
246    identity: Option<(String, Role)>,
247    /// `Some(xid)` when AS OF resolved to a historical xid; `None`
248    /// for live reads.
249    as_of_floor: Option<Xid>,
250    /// True when the statement snapshot can require tuple versions that
251    /// current secondary indexes no longer contain.
252    requires_index_fallback: bool,
253    /// Privilege class required by the statement, derived from the
254    /// SQL text at frame-build time. Read/write dispatch sites
255    /// consult this instead of re-classifying the parsed expression.
256    required_privilege: Privilege,
257    /// Collection-level lock intent the statement implies. The
258    /// lock-acquisition path short-circuits when this is `None`.
259    lock_intent: LockIntent,
260    /// Set of collection ids the active `(tenant, role)` scope is
261    /// allowed to observe. Computed at frame-build time via the
262    /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
263    /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
264    /// scoring (issue #119). `None` when no auth store is wired
265    /// (embedded test mode) — AI search refuses on `None`.
266    visible_collections: Option<HashSet<String>>,
267    /// Per-owner buffer arena for query-result row chunks (#885). Owned
268    /// by the frame because the frame already owns the query lifecycle;
269    /// lent to the row-streaming path (`execute_runtime_table_query_in`)
270    /// so chunk buffers are reused across the statement's chunk-fetches
271    /// instead of allocated fresh per chunk. Reclaimed when the frame
272    /// drops at statement end — no `thread_local!` scratch, which would
273    /// be unsound under tokio's work-stealing runtime.
274    row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
275}
276
277pub(super) struct StatementFrameGuards {
278    _tx_local_guard: TxLocalTenantGuard,
279    _config_snapshot_guard: ConfigSnapshotGuard,
280    _secret_store_guard: SecretStoreGuard,
281    _snapshot_guard: CurrentSnapshotGuard,
282}
283
284pub(super) struct PreparedStatement {
285    pub(super) expr: QueryExpr,
286    pub(super) mode: QueryMode,
287}
288
289impl StatementExecutionFrame {
290    pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
291        let conn_id = current_connection_id();
292        let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
293        let own_xids = runtime.own_transaction_xids(conn_id);
294        let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
295        let requires_index_fallback =
296            as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
297        let cache_key = result_cache_key(query);
298        let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
299        let cache_safe = runtime.result_cache_safe(conn_id);
300        // Capture identity + effective scope under the same
301        // thread-local view that the cache key was built from, so
302        // the Interface and the cache key agree on what "this
303        // statement" means.
304        let effective_scope = current_tenant();
305        let identity = current_auth_identity();
306
307        // Coarse classification of the statement, computed once from
308        // the SQL prefix so downstream callers don't re-derive it
309        // from the parsed `QueryExpr` at every privilege / lock site.
310        let required_privilege = classify_privilege(query);
311        let lock_intent = classify_lock_intent(query);
312
313        // Issue #119: resolve the visible-collections set for the
314        // active (tenant, role) scope. Only meaningful when an auth
315        // store is wired *and* an identity was captured — embedded
316        // anonymous callers fall back to `None`, and AI search call
317        // sites refuse on `None`.
318        let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
319        {
320            (Some(store), Some((principal, role))) => {
321                let collections = runtime.inner.db.store().list_collections();
322                Some(store.visible_collections_for_scope(
323                    effective_scope.as_deref(),
324                    *role,
325                    principal,
326                    &collections,
327                ))
328            }
329            _ => None,
330        };
331
332        Ok(Self {
333            tx_local_tenant,
334            snapshot,
335            own_xids,
336            cache_key,
337            is_volatile_query,
338            cache_safe,
339            effective_scope,
340            identity,
341            as_of_floor,
342            requires_index_fallback,
343            required_privilege,
344            lock_intent,
345            visible_collections,
346            row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
347        })
348    }
349
350    /// Lend the frame's per-owner row-buffer arena (#885) to the
351    /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
352    /// the owner and the arena is reclaimed when the frame drops at
353    /// statement end.
354    pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
355        Rc::clone(&self.row_arena)
356    }
357
358    pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
359        StatementFrameGuards {
360            _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
361            _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
362            _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
363            _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
364                snapshot: self.snapshot.clone(),
365                manager: Arc::clone(&runtime.inner.snapshot_manager),
366                own_xids: self.own_xids.clone(),
367                requires_index_fallback: self.requires_index_fallback,
368            }),
369        }
370    }
371
372    pub(super) fn cache_key(&self) -> &str {
373        &self.cache_key
374    }
375
376    pub(super) fn can_read_result_cache(&self) -> bool {
377        // Delegates to the `ReadFrame` Interface so the volatile +
378        // active-tx safety decision lives in exactly one place.
379        <Self as ReadFrame>::should_cache_result(self)
380    }
381
382    pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
383        // Cache-safety (volatile builtin, active-tx writes) comes from
384        // the Interface; the rest are write-side payload heuristics
385        // (statement shape, result size) that aren't part of the
386        // safety contract.
387        <Self as ReadFrame>::should_cache_result(self)
388            && result.statement_type == "select"
389            && result.engine != "vault"
390            && result.engine != "runtime-rank"
391            // `QUEUE READ` is a stateful read: a delayed message
392            // (issue #722) becomes deliverable over time without a
393            // producer push to invalidate the cache, so a cached empty
394            // result would hide it. Skip caching entirely.
395            && result.statement != "queue_group_read"
396            && result.result.pre_serialized_json.is_none()
397            // Graph-analytics TVF output (issue #802) is deterministic and
398            // expensive to recompute, so it is cached at any row count. The
399            // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
400            && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
401    }
402
403    pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
404        if self.can_read_result_cache() {
405            runtime.get_result_cache_entry(self.cache_key())
406        } else {
407            None
408        }
409    }
410
411    pub(super) fn write_result_cache(
412        &self,
413        runtime: &RedDBRuntime,
414        result: &RuntimeQueryResult,
415        scopes: HashSet<String>,
416    ) {
417        if self.should_write_result_cache(result) {
418            runtime.put_result_cache_entry(
419                self.cache_key(),
420                RuntimeResultCacheEntry {
421                    result: result.clone(),
422                    cached_at: std::time::Instant::now(),
423                    scopes,
424                },
425            );
426        }
427    }
428
429    pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
430        // Detected via cheap prefix check so non-CTE queries skip the
431        // full parse here. CTE-bearing queries bypass the plan cache
432        // and result cache (rare workload — perf optimization is a
433        // follow-up). Inlining substitutes every CTE reference with
434        // its body as a subquery in FROM, after which the existing
435        // subquery-in-FROM machinery handles execution. Recursive
436        // CTEs are rejected explicitly until fixpoint execution wires
437        // through the runtime.
438        if !has_with_prefix(query) {
439            return Ok(None);
440        }
441        let parsed = crate::storage::query::parser::parse(query)
442            .map_err(|err| RedDBError::Query(err.to_string()))?;
443        if parsed.with_clause.is_some() {
444            let rewritten = crate::storage::query::executors::inline_ctes(parsed)
445                .map_err(|err| RedDBError::Query(err.to_string()))?;
446            return Ok(Some(rewritten));
447        }
448        // No WITH after parse (the prefix matched something else like
449        // `WITHIN` that already routed elsewhere) — fall through to
450        // the normal path with the original query.
451        Ok(None)
452    }
453
454    pub(super) fn prepare_statement(
455        &self,
456        runtime: &RedDBRuntime,
457        query: &str,
458    ) -> RedDBResult<PreparedStatement> {
459        let mode = detect_mode(query);
460        if matches!(mode, QueryMode::Unknown) {
461            return Err(RedDBError::Query("unable to detect query mode".to_string()));
462        }
463
464        // ── Plan cache: reuse only exact-query ASTs ──
465        //
466        // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
467        // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
468        // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
469        // Plan cache applies to statements whose shape can be
470        // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
471        // reuses the same plan across thousands of varying literals).
472        // INSERT is still bypassed — its shape changes per column set
473        // and bulk paths don't go through here anyway.
474        let first_word = query
475            .trim()
476            .split_ascii_whitespace()
477            .next()
478            .unwrap_or("")
479            .to_ascii_uppercase();
480        let is_insert = first_word == "INSERT";
481
482        // Fused normalize+extract: one byte-scan produces both the
483        // cache_key AND the literal bindings. Saves a second Lexer
484        // pass over the query text on every cache hit — dominant
485        // cost on tight UPDATE loops that hit the same shape
486        // thousands of times with varying literals.
487        let (cache_key, prescan_binds) = if is_insert {
488            (String::new(), Vec::new())
489        } else {
490            crate::storage::query::planner::cache_key::normalize_and_extract(query)
491        };
492
493        let expr = if is_insert {
494            // Bypass plan cache for INSERT — shape varies per query.
495            parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
496        } else {
497            // ── Hot path: read lock only (no writer serialization on cache hits) ──
498            //
499            // peek() is a non-mutating probe: no LRU promotion, no touch().
500            // This lets concurrent readers proceed without blocking each other.
501            // On hit we bind literals if needed and return immediately.
502            // Only on miss do we drop to a write lock to parse + insert.
503            let hit = {
504                let plan_cache = runtime.inner.query_cache.read();
505                plan_cache.peek(&cache_key).map(|cached| {
506                    let parameter_count = cached.parameter_count;
507                    let optimized = cached.plan.optimized.clone();
508                    let exact_query = cached.exact_query.clone();
509                    (parameter_count, optimized, exact_query)
510                })
511            };
512
513            if let Some((parameter_count, optimized, exact_query)) = hit {
514                if parameter_count > 0 {
515                    // Shape hit: use the binds extracted during normalise.
516                    let shape_binds = prescan_binds.clone();
517                    if let Some(bound) =
518                        crate::storage::query::planner::shape::bind_parameterized_query(
519                            &optimized,
520                            &shape_binds,
521                            parameter_count,
522                        )
523                    {
524                        bound
525                    } else if exact_query.as_deref() == Some(query) {
526                        // Bind failed but exact query matches — use as-is.
527                        optimized
528                    } else {
529                        // Bind failed and literals differ: re-parse fresh.
530                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
531                    }
532                } else {
533                    // No parameters means either there truly are no literals,
534                    // or this statement type does not participate in shape
535                    // parameterization (for example graph/queue commands).
536                    // Reusing a normalized-cache hit across a different exact
537                    // query can therefore leak stale literals into execution.
538                    if exact_query.as_deref() == Some(query) {
539                        optimized
540                    } else {
541                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
542                    }
543                }
544            } else {
545                // Cache miss — parse, parameterize, store.
546                let parsed =
547                    parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
548                let (cached_expr, parameter_count) = if let Some(prepared) =
549                    crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
550                {
551                    (prepared.shape, prepared.parameter_count)
552                } else {
553                    (parsed.clone(), 0)
554                };
555                {
556                    let mut pc = runtime.inner.query_cache.write();
557                    let plan = crate::storage::query::planner::QueryPlan::new(
558                        parsed.clone(),
559                        cached_expr,
560                        Default::default(),
561                    );
562                    pc.insert(
563                        cache_key.clone(),
564                        crate::storage::query::planner::CachedPlan::new(plan)
565                            .with_shape_key(cache_key.clone())
566                            .with_exact_query(query.to_string())
567                            .with_parameter_count(parameter_count),
568                    );
569                }
570                parsed
571            }
572        };
573
574        // Phase 5 PG parity: substitute any registered view name that
575        // appears in the expression with its stored body. Runs after
576        // parse and before dispatch so the SQL entrypoint gets the
577        // same view resolution `execute_query_expr` already does.
578        let expr = runtime.rewrite_view_refs(expr);
579
580        Ok(PreparedStatement { expr, mode })
581    }
582
583    pub(super) fn check_query_privilege(
584        &self,
585        runtime: &RedDBRuntime,
586        expr: &QueryExpr,
587    ) -> RedDBResult<()> {
588        // Frame-level coarse gate. We consult `required_privilege()`
589        // (computed once at frame-build) against the captured identity
590        // before the deep grant engine walks the parsed expression.
591        // The coarse gate cannot ALLOW anything the grant engine would
592        // deny — it only short-circuits the obvious "Role::Read tries
593        // INSERT" case so a downstream caller never has to redo this
594        // check inline. `Privilege::None` (transaction control / SET /
595        // SHOW) flows through unchanged; the grant engine treats those
596        // as bypass too.
597        if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
598            let needed = <Self as ReadFrame>::required_privilege(self);
599            if !needed.is_satisfied_by(role) {
600                // Issue #205 — when the deep grant engine *also*
601                // denies, we treat this as an ordinary permission
602                // failure. But when an Admin-only statement reaches
603                // this gate without an auth_store wired (so the deep
604                // engine can't double-check), the coarse rejection is
605                // the only line of defence — emit an OperatorEvent so
606                // the operator notices an Admin-class statement was
607                // attempted with insufficient role.
608                if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
609                    crate::telemetry::operator_event::OperatorEvent::AuthBypass {
610                        principal: username.to_string(),
611                        resource: format!("statement requiring {needed:?}"),
612                        detail: format!(
613                            "auth_store not wired; coarse gate is sole defence (role={role:?})"
614                        ),
615                    }
616                    .emit_global();
617                }
618                return Err(RedDBError::Query(format!(
619                    "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
620                )));
621            }
622        }
623        runtime
624            .check_query_privilege(expr)
625            .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
626    }
627
628    pub(super) fn prepare_dispatch(
629        &self,
630        runtime: &RedDBRuntime,
631        expr: &QueryExpr,
632    ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
633        runtime.validate_model_operations_before_auth(expr)?;
634        self.check_query_privilege(runtime, expr)?;
635        Ok(self.acquire_intent_locks(runtime, expr))
636    }
637
638    pub(super) fn acquire_intent_locks(
639        &self,
640        runtime: &RedDBRuntime,
641        expr: &QueryExpr,
642    ) -> Option<crate::runtime::locking::LockerGuard> {
643        if !runtime.config_bool("concurrency.locking.enabled", true) {
644            return None;
645        }
646        // Frame-level short-circuit: if the statement carries no lock
647        // intent (transaction control, SET, SHOW), skip the lock
648        // manager entirely instead of letting `intent_lock_modes_for`
649        // walk the parsed expression to reach the same conclusion.
650        if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
651            return None;
652        }
653        intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
654            let mut guard =
655                crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
656            let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
657            for collection in collections_referenced(expr) {
658                let _ = guard.acquire(
659                    crate::runtime::locking::Resource::Collection(collection),
660                    coll_mode,
661                );
662            }
663            guard
664        })
665    }
666}
667
668impl ReadFrame for StatementExecutionFrame {
669    fn effective_scope(&self) -> Option<&str> {
670        self.effective_scope.as_deref()
671    }
672
673    fn identity(&self) -> Option<(&str, Role)> {
674        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
675    }
676
677    fn snapshot(&self) -> &Snapshot {
678        &self.snapshot
679    }
680
681    fn as_of_floor(&self) -> Option<Xid> {
682        self.as_of_floor
683    }
684
685    fn cache_key(&self) -> &str {
686        &self.cache_key
687    }
688
689    fn should_cache_result(&self) -> bool {
690        !self.is_volatile_query && self.cache_safe
691    }
692
693    fn required_privilege(&self) -> Privilege {
694        self.required_privilege
695    }
696
697    fn lock_intent(&self) -> LockIntent {
698        self.lock_intent
699    }
700
701    fn visible_collections(&self) -> Option<&HashSet<String>> {
702        self.visible_collections.as_ref()
703    }
704}
705
706/// Lightweight `ReadFrame` carrier used by AI command entry points
707/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
708///
709/// Issue #119 calls this struct `EffectiveScope`. It bundles the
710/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
711/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
712/// instead of re-reading thread-locals at every call site.
713///
714/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
715/// from the per-statement thread-locals (identical to how
716/// `StatementExecutionFrame::build` derives them) and resolves
717/// `visible_collections` via the `AuthStore` cache.
718pub struct EffectiveScope {
719    pub(crate) tenant: Option<String>,
720    pub(crate) identity: Option<(String, Role)>,
721    pub(crate) snapshot: Snapshot,
722    pub(crate) visible_collections: Option<HashSet<String>>,
723}
724
725impl EffectiveScope {
726    /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
727    /// to gate LLM-backed NER calls behind `ai:ner:read`.
728    ///
729    /// Placeholder for now: always returns `false`. The auth engine's
730    /// capability matrix is future work; until it lands, every routed
731    /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
732    /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
733    /// Documented in code so the wire-up is a one-line change once
734    /// the auth engine learns capabilities.
735    pub fn has_capability(&self, _capability: &str) -> bool {
736        false
737    }
738}
739
740impl ReadFrame for EffectiveScope {
741    fn effective_scope(&self) -> Option<&str> {
742        self.tenant.as_deref()
743    }
744    fn identity(&self) -> Option<(&str, Role)> {
745        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
746    }
747    fn snapshot(&self) -> &Snapshot {
748        &self.snapshot
749    }
750    fn as_of_floor(&self) -> Option<Xid> {
751        None
752    }
753    fn cache_key(&self) -> &str {
754        ""
755    }
756    fn should_cache_result(&self) -> bool {
757        false
758    }
759    fn required_privilege(&self) -> Privilege {
760        Privilege::Read
761    }
762    fn lock_intent(&self) -> LockIntent {
763        LockIntent::Shared
764    }
765    fn visible_collections(&self) -> Option<&HashSet<String>> {
766        self.visible_collections.as_ref()
767    }
768}
769
770impl RedDBRuntime {
771    /// Build the AI command `EffectiveScope` from the current
772    /// statement thread-locals + auth store.
773    ///
774    /// Returns `None` for embedded callers (no auth store, no
775    /// identity) — `AuthorizedSearch` treats `None` as deny-default.
776    pub(crate) fn ai_scope(&self) -> EffectiveScope {
777        let tenant = super::impl_core::current_tenant();
778        let identity = super::impl_core::current_auth_identity();
779        let snapshot = self.current_snapshot();
780        let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
781            (Some(store), Some((principal, role))) => {
782                let collections = self.inner.db.store().list_collections();
783                Some(store.visible_collections_for_scope(
784                    tenant.as_deref(),
785                    *role,
786                    principal,
787                    &collections,
788                ))
789            }
790            _ => None,
791        };
792        EffectiveScope {
793            tenant,
794            identity,
795            snapshot,
796            visible_collections,
797        }
798    }
799}
800
801/// Test fixtures for callers that need to drive `ReadFrame` without
802/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
803/// only leaks across module boundaries inside the crate.
804#[cfg(test)]
805pub(crate) mod test_support {
806    use super::{LockIntent, Privilege, ReadFrame};
807    use crate::auth::Role;
808    use crate::storage::transaction::snapshot::{Snapshot, Xid};
809    use std::collections::HashSet;
810
811    /// A `ReadFrame` impl with hand-set fields. Used by
812    /// `authorized_search` tests to assert the deny-default and
813    /// scope-trim behaviour without going through frame construction.
814    pub(crate) struct FakeReadFrame {
815        pub tenant: Option<String>,
816        pub identity: Option<(String, Role)>,
817        pub snapshot: Snapshot,
818        pub visible: Option<HashSet<String>>,
819    }
820
821    impl FakeReadFrame {
822        pub(crate) fn without_scope() -> Self {
823            Self {
824                tenant: None,
825                identity: None,
826                snapshot: Snapshot {
827                    xid: 0,
828                    in_progress: HashSet::new(),
829                },
830                visible: None,
831            }
832        }
833
834        pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
835            Self {
836                tenant: Some("acme".to_string()),
837                identity: Some(("alice".to_string(), Role::Read)),
838                snapshot: Snapshot {
839                    xid: 0,
840                    in_progress: HashSet::new(),
841                },
842                visible: Some(visible),
843            }
844        }
845    }
846
847    impl ReadFrame for FakeReadFrame {
848        fn effective_scope(&self) -> Option<&str> {
849            self.tenant.as_deref()
850        }
851        fn identity(&self) -> Option<(&str, Role)> {
852            self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
853        }
854        fn snapshot(&self) -> &Snapshot {
855            &self.snapshot
856        }
857        fn as_of_floor(&self) -> Option<Xid> {
858            None
859        }
860        fn cache_key(&self) -> &str {
861            ""
862        }
863        fn should_cache_result(&self) -> bool {
864            false
865        }
866        fn required_privilege(&self) -> Privilege {
867            Privilege::Read
868        }
869        fn lock_intent(&self) -> LockIntent {
870            LockIntent::Shared
871        }
872        fn visible_collections(&self) -> Option<&HashSet<String>> {
873            self.visible.as_ref()
874        }
875    }
876}
877
878impl RedDBRuntime {
879    fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
880        let mut set = HashSet::new();
881        if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
882            set.insert(ctx.xid);
883            for (_, sub) in &ctx.savepoints {
884                set.insert(*sub);
885            }
886            for sub in &ctx.released_sub_xids {
887                set.insert(*sub);
888            }
889        }
890        set
891    }
892
893    /// Resolve the snapshot for the current statement, returning
894    /// the snapshot itself and (when AS OF is in effect) the
895    /// resolved xid floor. The floor is the same xid carried inside
896    /// `Snapshot.xid` for AS OF reads — exposing it separately lets
897    /// the `ReadFrame` Interface tell "live read" from "historical
898    /// read" without inferring from `in_progress.is_empty()`.
899    fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
900        match peek_top_level_as_of_with_table(query) {
901            Some((spec, Some(table))) => {
902                if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
903                    return Err(RedDBError::InvalidConfig(format!(
904                        "AS OF requires a versioned collection — \
905                         `{table}` has not opted in. \
906                         Call vcs.set_versioned(\"{table}\", true) first."
907                    )));
908                }
909                let xid = self.vcs_resolve_as_of(spec)?;
910                Ok((
911                    Snapshot {
912                        xid,
913                        in_progress: HashSet::new(),
914                    },
915                    Some(xid),
916                ))
917            }
918            Some((spec, None)) => {
919                let xid = self.vcs_resolve_as_of(spec)?;
920                Ok((
921                    Snapshot {
922                        xid,
923                        in_progress: HashSet::new(),
924                    },
925                    Some(xid),
926                ))
927            }
928            None => Ok((self.current_snapshot(), None)),
929        }
930    }
931
932    fn result_cache_safe(&self, conn_id: u64) -> bool {
933        let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
934        let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
935        !has_active_xids && !in_own_tx
936    }
937}
938
939/// Whether a result's `engine` tag is one of the graph-analytics TVF
940/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
941/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
942/// algorithm output that is cached regardless of row count.
943fn is_graph_tvf_engine(engine: &str) -> bool {
944    matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
945}
946
947fn result_cache_key(query: &str) -> String {
948    let tenant = current_tenant().unwrap_or_default();
949    let auth = current_auth_identity()
950        .map(|(user, role)| format!("{}|{:?}", user, role))
951        .unwrap_or_default();
952    if tenant.is_empty() && auth.is_empty() {
953        query.to_string()
954    } else {
955        format!("{query}\u{001e}{tenant}\u{001e}{auth}")
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use super::*;
962    use crate::api::RedDBOptions;
963    use crate::runtime::impl_core::{
964        clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
965        set_current_tenant,
966    };
967    use crate::runtime::RedDBRuntime;
968
969    fn fresh_runtime() -> RedDBRuntime {
970        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
971    }
972
973    /// Ensure thread-local state from a prior test can't leak into
974    /// the next one — tests in the same binary share the thread.
975    fn reset_thread_locals() {
976        clear_current_tenant();
977        clear_current_auth_identity();
978    }
979
980    #[test]
981    fn autocommit_select_takes_live_snapshot() {
982        reset_thread_locals();
983        let rt = fresh_runtime();
984        let frame =
985            StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
986
987        // Live reads: no AS OF floor, snapshot bounded by the
988        // manager's `peek_next_xid` so committed tuples are visible.
989        let f: &dyn ReadFrame = &frame;
990        assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
991        assert!(
992            f.snapshot().xid >= 1,
993            "autocommit snapshot xid is bounded by peek_next_xid"
994        );
995    }
996
997    #[test]
998    fn frame_captures_identity_and_scope() {
999        reset_thread_locals();
1000        set_current_tenant("acme".to_string());
1001        set_current_auth_identity("alice".to_string(), Role::Write);
1002
1003        let rt = fresh_runtime();
1004        let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1005        let f: &dyn ReadFrame = &frame;
1006
1007        assert_eq!(f.effective_scope(), Some("acme"));
1008        let id = f.identity().expect("identity captured");
1009        assert_eq!(id.0, "alice");
1010        assert!(matches!(id.1, Role::Write));
1011
1012        // Cache key mixes scope + identity so two callers under
1013        // different tenants never share a cache slot.
1014        assert!(
1015            f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1016            "cache key folds in scope + identity, got {:?}",
1017            f.cache_key()
1018        );
1019
1020        reset_thread_locals();
1021    }
1022
1023    #[test]
1024    fn as_of_rejects_non_versioned_user_collection() {
1025        reset_thread_locals();
1026        let rt = fresh_runtime();
1027
1028        // `not_versioned` is a plain user collection — the frame
1029        // builder must reject AS OF until the caller opts in via
1030        // `vcs.set_versioned`.
1031        let err = match StatementExecutionFrame::build(
1032            &rt,
1033            "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1034        ) {
1035            Err(e) => e,
1036            Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1037        };
1038
1039        let msg = format!("{err}");
1040        assert!(
1041            msg.contains("AS OF requires a versioned collection"),
1042            "expected AS OF rejection, got: {msg}"
1043        );
1044    }
1045
1046    /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1047    ///
1048    /// Sets a tenant + identity via the public thread-local API the
1049    /// runtime uses for ambient scope, drives a real `SELECT` through
1050    /// `execute_query`, then inspects the result cache that the SELECT
1051    /// path populates via `frame.cache_key()`. The key only carries
1052    /// the tenant + identity *because* it was built through the frame —
1053    /// reverting the wiring to inline `current_tenant()` /
1054    /// `current_auth_identity()` reads would still pass this test, but
1055    /// dropping the frame entirely (so the SELECT path stopped touching
1056    /// `cache_key`) would break it.
1057    #[test]
1058    fn select_path_routes_through_frame_cache_key() {
1059        reset_thread_locals();
1060        set_current_tenant("acme".to_string());
1061        set_current_auth_identity("alice".to_string(), Role::Read);
1062
1063        let rt = fresh_runtime();
1064        let result = rt
1065            .execute_query("SELECT 1")
1066            .expect("SELECT 1 executes under tenant=acme/identity=alice");
1067        assert_eq!(result.statement_type, "select");
1068
1069        // The SELECT path (in `execute_query_expr`) builds a frame and
1070        // writes its result through `frame.cache_key()`. That key folds
1071        // tenant + identity in via `result_cache_key`, so finding "acme"
1072        // and "alice" inside any cached key proves the frame was the
1073        // seam used.
1074        let cache = rt.inner.result_cache.read();
1075        let any_keyed_with_scope = cache
1076            .0
1077            .keys()
1078            .any(|k| k.contains("acme") && k.contains("alice"));
1079        assert!(
1080            any_keyed_with_scope,
1081            "expected at least one result-cache key carrying tenant+identity, \
1082             got keys: {:?}",
1083            cache.0.keys().collect::<Vec<_>>()
1084        );
1085
1086        reset_thread_locals();
1087    }
1088
1089    /// A SELECT that calls a volatile builtin (here:
1090    /// `pg_advisory_unlock`, the volatile token the runtime currently
1091    /// recognises in `query_has_volatile_builtin`) must NOT populate
1092    /// the result cache. Any caller hitting the cache after this would
1093    /// see a stale answer for an inherently-volatile query, so the
1094    /// SELECT path gates writes through `frame.should_cache_result()`.
1095    ///
1096    /// Deletion test: removing `ReadFrame::should_cache_result`, or
1097    /// reverting the SELECT path to skip its safety gate, would let
1098    /// the result cache silently absorb this statement and break the
1099    /// assertion below.
1100    #[test]
1101    fn volatile_select_does_not_populate_result_cache() {
1102        reset_thread_locals();
1103        let rt = fresh_runtime();
1104
1105        // Frame-level invariant: the volatile-builtin signal collapses
1106        // `should_cache_result` to false even for an autocommit /
1107        // out-of-tx connection.
1108        let frame =
1109            StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1110        let f: &dyn ReadFrame = &frame;
1111        assert!(
1112            !f.should_cache_result(),
1113            "volatile builtin must disable result-cache safety"
1114        );
1115
1116        // End-to-end: drive the volatile SELECT through `execute_query`
1117        // and confirm no entry was stamped under its cache key. Other
1118        // entries from prior tests sharing the binary may exist, so we
1119        // assert specifically on this query's key.
1120        let _ = rt
1121            .execute_query("SELECT pg_advisory_unlock(1)")
1122            .expect("volatile SELECT executes");
1123        let cache = rt.inner.result_cache.read();
1124        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1125        assert!(
1126            !cache.0.contains_key(&key),
1127            "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1128            cache.0.keys().collect::<Vec<_>>()
1129        );
1130
1131        reset_thread_locals();
1132    }
1133
1134    #[test]
1135    fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1136        reset_thread_locals();
1137        let rt = fresh_runtime();
1138        rt.inner
1139            .db
1140            .store()
1141            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1142
1143        let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1144        assert_eq!(result.statement_type, "select");
1145
1146        let key = result_cache_key("SELECT 1");
1147        assert!(
1148            rt.inner
1149                .result_blob_cache
1150                .get("runtime.result_cache", &key)
1151                .is_some(),
1152            "blob backend should stamp the Blob Cache path"
1153        );
1154        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1155        assert!(
1156            !rt.inner.result_cache.read().0.contains_key(&key),
1157            "blob backend should not write the legacy map"
1158        );
1159    }
1160
1161    #[test]
1162    fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1163        reset_thread_locals();
1164        let rt = fresh_runtime();
1165        rt.inner
1166            .db
1167            .store()
1168            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1169
1170        let _ = rt
1171            .execute_query("SELECT pg_advisory_unlock(1)")
1172            .expect("volatile SELECT executes");
1173        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1174        assert!(
1175            rt.inner
1176                .result_blob_cache
1177                .get("runtime.result_cache", &key)
1178                .is_none(),
1179            "volatile SELECT must not populate blob result cache"
1180        );
1181        assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1182    }
1183
1184    #[test]
1185    fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1186        reset_thread_locals();
1187        let rt = fresh_runtime();
1188        rt.inner
1189            .db
1190            .store()
1191            .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1192
1193        let first = rt.execute_query("SELECT 1").expect("first SELECT");
1194        let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1195        assert_eq!(first.result.len(), second.result.len());
1196
1197        let key = result_cache_key("SELECT 1");
1198        assert!(rt.inner.result_cache.read().0.contains_key(&key));
1199        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1200        assert_eq!(rt.result_cache_shadow_divergences(), 0);
1201        assert_eq!(
1202            crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1203            "cache_shadow_divergence_total"
1204        );
1205    }
1206
1207    #[test]
1208    fn as_of_on_red_collection_records_floor() {
1209        reset_thread_locals();
1210        let rt = fresh_runtime();
1211
1212        // `red_*` collections always allow AS OF. The frame should
1213        // resolve to a concrete xid and surface it via the Interface.
1214        let frame =
1215            StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1216                .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1217
1218        let f: &dyn ReadFrame = &frame;
1219        assert_eq!(
1220            f.as_of_floor(),
1221            Some(1),
1222            "AS OF SNAPSHOT 1 records xid=1 as the floor"
1223        );
1224        assert_eq!(f.snapshot().xid, 1);
1225        assert!(
1226            f.snapshot().in_progress.is_empty(),
1227            "historical reads have no in-progress set"
1228        );
1229    }
1230
1231    /// The frame classifies common SQL prefixes into the coarse
1232    /// `Privilege` / `LockIntent` buckets at build time. This test
1233    /// pins the mapping so a regression that silently re-routes
1234    /// (e.g. INSERT classified as Read) surfaces here, not at a
1235    /// downstream privilege gate.
1236    #[test]
1237    fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1238        reset_thread_locals();
1239        let rt = fresh_runtime();
1240
1241        let cases = [
1242            ("SELECT 1", Privilege::Read, LockIntent::Shared),
1243            (
1244                "INSERT INTO t (id) VALUES (1)",
1245                Privilege::Write,
1246                LockIntent::Exclusive,
1247            ),
1248            (
1249                "UPDATE t SET x = 1 WHERE id = 1",
1250                Privilege::Write,
1251                LockIntent::Exclusive,
1252            ),
1253            (
1254                "DELETE FROM t WHERE id = 1",
1255                Privilege::Write,
1256                LockIntent::Exclusive,
1257            ),
1258            (
1259                "CREATE TABLE foo (id INT)",
1260                Privilege::Write,
1261                LockIntent::Exclusive,
1262            ),
1263            ("BEGIN", Privilege::None, LockIntent::None),
1264            ("COMMIT", Privilege::None, LockIntent::None),
1265            ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1266        ];
1267
1268        for (q, want_priv, want_lock) in cases {
1269            let frame = StatementExecutionFrame::build(&rt, q)
1270                .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1271            let f: &dyn ReadFrame = &frame;
1272            assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1273            assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1274        }
1275    }
1276
1277    /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1278    /// driven through `execute_query` under an identity whose role
1279    /// doesn't satisfy the frame's coarse `Read` privilege gets
1280    /// denied with the frame's signal.
1281    ///
1282    /// We test the gate by classifying an INSERT (which the frame
1283    /// reports as `Privilege::Write`) under `Role::Read` — the only
1284    /// pair the legacy fallback would also reject, but here the
1285    /// rejection comes through `frame.check_query_privilege` BEFORE
1286    /// the parsed-expression walker runs. Removing
1287    /// `required_privilege` (or the `is_satisfied_by` consult inside
1288    /// `check_query_privilege`) would force the deny path back to the
1289    /// inline `RedDBRuntime::check_query_privilege` walker — but the
1290    /// auth_store gate up there is bypassed when no auth_store is
1291    /// wired (embedded test mode), so this test would FLIP from
1292    /// denied to permitted and break the assertion below.
1293    #[test]
1294    fn insert_under_read_role_denied_via_frame_privilege() {
1295        reset_thread_locals();
1296        set_current_auth_identity("alice".to_string(), Role::Read);
1297
1298        let rt = fresh_runtime();
1299        // Bypass parser by reaching into the frame directly: the
1300        // frame derives privilege from the SQL prefix without
1301        // needing an auth_store wired up. Driving end-to-end via
1302        // `execute_query` would also reject (no table `t`), but for
1303        // a different reason — we want to pin the privilege seam.
1304        let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1305            .expect("frame builds for INSERT");
1306        let f: &dyn ReadFrame = &frame;
1307        assert_eq!(
1308            f.required_privilege(),
1309            Privilege::Write,
1310            "INSERT classified as Write"
1311        );
1312        let id = f.identity().expect("identity captured");
1313        assert!(
1314            !f.required_privilege().is_satisfied_by(id.1),
1315            "Role::Read does not satisfy Privilege::Write — frame must deny"
1316        );
1317
1318        // End-to-end: the frame's `check_query_privilege` sees the
1319        // (Read role, Write privilege) mismatch and denies before
1320        // dispatch. We drive a synthetic `QueryExpr::Table` because
1321        // the SELECT/INSERT parser would happen to also fail, and we
1322        // want the failure to come from the privilege seam.
1323        use crate::storage::query::ast::{QueryExpr, TableQuery};
1324        let expr = QueryExpr::Table(TableQuery::new("t"));
1325        let err = frame
1326            .check_query_privilege(&rt, &expr)
1327            .expect_err("denied via frame's coarse privilege gate");
1328        let msg = format!("{err}");
1329        assert!(
1330            msg.contains("permission denied") && msg.contains("Write"),
1331            "expected frame-level Write deny, got: {msg}"
1332        );
1333
1334        reset_thread_locals();
1335    }
1336
1337    /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1338    /// wired into the SELECT path and produces observable results
1339    /// byte-identical to the per-request-allocation baseline.
1340    ///
1341    /// A table with more rows than the streaming high-water mark
1342    /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1343    /// path to assemble many chunks, each leasing/recycling the frame
1344    /// arena's single chunk buffer. Driving it through `execute_query`
1345    /// (which builds a `StatementExecutionFrame` and lends its arena)
1346    /// must return every inserted row, in order — exactly what the
1347    /// allocate-per-chunk path returned. A bug in the arena wiring
1348    /// (dropped rows, bled rows, mis-ordering) would surface here.
1349    #[test]
1350    fn large_select_through_frame_arena_returns_all_rows_in_order() {
1351        reset_thread_locals();
1352        let rt = fresh_runtime();
1353        rt.execute_query("CREATE TABLE big (id INT)")
1354            .expect("create table");
1355
1356        // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1357        // spans multiple chunks and the arena buffer is reused.
1358        const N: usize = 2_500;
1359        let values = (0..N)
1360            .map(|i| format!("({i})"))
1361            .collect::<Vec<_>>()
1362            .join(", ");
1363        rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1364            .expect("insert rows");
1365
1366        let result = rt
1367            .execute_query("SELECT id FROM big ORDER BY id")
1368            .expect("large SELECT executes through the frame arena path");
1369        assert_eq!(result.statement_type, "select");
1370        assert_eq!(
1371            result.result.records.len(),
1372            N,
1373            "every inserted row streams back through the arena-backed channel"
1374        );
1375        for (i, record) in result.result.records.iter().enumerate() {
1376            assert_eq!(
1377                record.get("id"),
1378                Some(&crate::storage::schema::Value::Integer(i as i64)),
1379                "row {i} is byte-identical to the per-request-allocation baseline"
1380            );
1381        }
1382
1383        reset_thread_locals();
1384    }
1385
1386    /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1387    /// control statement carries `LockIntent::None` and the
1388    /// `acquire_intent_locks` path returns `None` without consulting
1389    /// `intent_lock_modes_for`. Removing the method (or its consult
1390    /// site in `acquire_intent_locks`) would force the lock-mode
1391    /// helper to walk a fabricated parsed expression to reach the
1392    /// same conclusion — but the assertion that no guard is allocated
1393    /// for a `BEGIN` frame would still hold, so we additionally pin
1394    /// the classifier mapping above to make the deletion observable.
1395    #[test]
1396    fn control_statement_skips_intent_locks_via_frame() {
1397        reset_thread_locals();
1398        let rt = fresh_runtime();
1399
1400        let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1401        let f: &dyn ReadFrame = &frame;
1402        assert_eq!(f.lock_intent(), LockIntent::None);
1403
1404        // Drive `acquire_intent_locks` against a fabricated SELECT
1405        // expression that WOULD normally yield `(IS, IS)`; the frame's
1406        // `lock_intent() == None` short-circuit must still suppress
1407        // the guard.
1408        use crate::storage::query::ast::{QueryExpr, TableQuery};
1409        let expr = QueryExpr::Table(TableQuery::new("t"));
1410        let guard = frame.acquire_intent_locks(&rt, &expr);
1411        assert!(
1412            guard.is_none(),
1413            "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1414        );
1415    }
1416}