Skip to main content

reddb_server/runtime/
statement_frame.rs

1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7    collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8    has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9    query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10    SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30    /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31    /// any role from `Role::Read` upward.
32    Read,
33    /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34    /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35    /// at least `Role::Write`.
36    Write,
37    /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38    /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39    Admin,
40    /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41    /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42    /// for any authenticated identity.
43    None,
44}
45
46impl Privilege {
47    /// `true` iff `role` is sufficient to execute a statement carrying
48    /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49    /// Admin` containment used by the auth fallback path.
50    pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51        match self {
52            Self::None => true,
53            Self::Read => role.can_read(),
54            Self::Write => role.can_write(),
55            Self::Admin => role.can_admin(),
56        }
57    }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67    /// No collection-level lock needed (transaction control, SET,
68    /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69    None,
70    /// Reader-style intent: SELECT, joins, graph / queue / search
71    /// reads. Maps to `(IS, IS)` at the storage layer.
72    Shared,
73    /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74    /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75    /// `Exclusive` at this granularity — call sites that need the
76    /// finer distinction still consult `intent_lock_modes_for`.
77    Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93    /// Effective tenant scope for the statement after WITHIN /
94    /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95    /// "no tenant bound" (RLS deny-default applies).
96    fn effective_scope(&self) -> Option<&str>;
97
98    /// Authenticated identity observed at frame-build time, if any.
99    /// Returns `(username, role)` so callers can render audit lines
100    /// or feed RLS policy lookups without re-reading thread-locals.
101    fn identity(&self) -> Option<(&str, Role)>;
102
103    /// MVCC snapshot the statement reads against. For autocommit
104    /// this is a fresh snapshot; inside an active transaction it
105    /// is the txn's snapshot; under AS OF it is the resolved
106    /// historical xid.
107    fn snapshot(&self) -> &Snapshot;
108
109    /// AS OF xid floor when AS OF was applied for this statement,
110    /// `None` for live reads. Useful for downstream callers that
111    /// want to gate behaviour on historical-read mode without
112    /// re-parsing the query.
113    fn as_of_floor(&self) -> Option<Xid>;
114
115    /// Stable result-cache key for the statement (already mixes
116    /// effective tenant + identity).
117    fn cache_key(&self) -> &str;
118
119    /// Whether the statement is safe to serve from / populate the
120    /// result cache. Combines two underlying signals:
121    ///
122    ///   * the query does not call a volatile builtin (e.g. `NOW()`,
123    ///     `RANDOM()`, `UUID()`), which would change between calls,
124    ///   * the connection is not inside an active transaction with
125    ///     uncommitted writes that other readers shouldn't observe.
126    ///
127    /// SELECT cache callsites (read + write) consult this method
128    /// instead of re-deriving safety from globals or poking the
129    /// frame's private fields. Removing it would force every cache
130    /// callsite to re-run `query_has_volatile_builtin` plus
131    /// `result_cache_safe(conn_id)` inline.
132    fn should_cache_result(&self) -> bool;
133
134    /// Coarse privilege class the statement requires, computed once
135    /// at frame-build time from the SQL prefix. Read/write dispatch
136    /// sites consult this instead of re-classifying the parsed
137    /// `QueryExpr` inline at every callsite.
138    ///
139    /// Removing this method would force every privilege gate to
140    /// recompute the (action, resource) classification from the
141    /// parsed expression and re-check the role hierarchy inline.
142    fn required_privilege(&self) -> Privilege;
143
144    /// Coarse collection-level lock intent the statement implies.
145    /// `None` lets the lock-acquisition path short-circuit without
146    /// touching the lock manager.
147    ///
148    /// Removing this method would force the lock-acquisition path
149    /// to always invoke `intent_lock_modes_for` (which itself walks
150    /// the parsed expression) even for transaction-control / SET /
151    /// SHOW statements that need no collection lock at all.
152    fn lock_intent(&self) -> LockIntent;
153
154    /// Set of collection ids the calling identity is allowed to
155    /// observe under the active `(tenant, role)` scope. Computed once
156    /// at frame-build time via the `AuthStore` visible-collections
157    /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158    /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159    /// before any similarity score is computed (issue #119).
160    ///
161    /// `None` means the frame was built without an auth store wired —
162    /// embedded / single-tenant tests run that way. AI search call
163    /// sites refuse to proceed with `None`, which is the deny-default
164    /// the issue requires; pure SELECT paths fall back to the existing
165    /// per-row RLS gate.
166    fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175    let trimmed = query.trim_start();
176    // Skip a leading line / block comment so the classifier doesn't
177    // misread `/* ... */ SELECT ...` as an unknown statement.
178    let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179        rest.split_once('\n')
180            .map(|(_, r)| r)
181            .unwrap_or("")
182            .trim_start()
183    } else {
184        trimmed
185    };
186    let first = trimmed
187        .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
188        .next()
189        .unwrap_or("");
190    // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
191    let mut buf = [0u8; 16];
192    let bytes = first.as_bytes();
193    let n = bytes.len().min(buf.len());
194    for i in 0..n {
195        buf[i] = bytes[i].to_ascii_uppercase();
196    }
197    match &buf[..n] {
198        b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" => "read",
199        b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
200        b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
201        b"GRANT" | b"REVOKE" => "admin",
202        b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
203        | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
204        _ => "unknown",
205    }
206}
207
208fn classify_privilege(query: &str) -> Privilege {
209    match statement_kind(query) {
210        "read" => Privilege::Read,
211        "write" => Privilege::Write,
212        // DDL is gated at `Role::Write` in the legacy fallback (see
213        // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
214        // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
215        // GRANT / REVOKE upgrade to Admin via finer checks at the call
216        // site — the frame surfaces only the coarse class.
217        "ddl" => Privilege::Write,
218        "admin" => Privilege::Admin,
219        _ => Privilege::None,
220    }
221}
222
223fn classify_lock_intent(query: &str) -> LockIntent {
224    match statement_kind(query) {
225        "read" => LockIntent::Shared,
226        "write" | "ddl" => LockIntent::Exclusive,
227        _ => LockIntent::None,
228    }
229}
230
231pub(super) struct StatementExecutionFrame {
232    tx_local_tenant: Option<Option<String>>,
233    snapshot: Snapshot,
234    own_xids: HashSet<Xid>,
235    cache_key: String,
236    is_volatile_query: bool,
237    cache_safe: bool,
238    /// Effective tenant captured at frame-build time after WITHIN /
239    /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
240    /// so the `ReadFrame` Interface can return a borrow without
241    /// re-touching the thread-local stack.
242    effective_scope: Option<String>,
243    /// Auth identity captured at frame-build time. `None` for
244    /// embedded / anonymous callers.
245    identity: Option<(String, Role)>,
246    /// `Some(xid)` when AS OF resolved to a historical xid; `None`
247    /// for live reads.
248    as_of_floor: Option<Xid>,
249    /// True when the statement snapshot can require tuple versions that
250    /// current secondary indexes no longer contain.
251    requires_index_fallback: bool,
252    /// Privilege class required by the statement, derived from the
253    /// SQL text at frame-build time. Read/write dispatch sites
254    /// consult this instead of re-classifying the parsed expression.
255    required_privilege: Privilege,
256    /// Collection-level lock intent the statement implies. The
257    /// lock-acquisition path short-circuits when this is `None`.
258    lock_intent: LockIntent,
259    /// Set of collection ids the active `(tenant, role)` scope is
260    /// allowed to observe. Computed at frame-build time via the
261    /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
262    /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
263    /// scoring (issue #119). `None` when no auth store is wired
264    /// (embedded test mode) — AI search refuses on `None`.
265    visible_collections: Option<HashSet<String>>,
266    /// Per-owner buffer arena for query-result row chunks (#885). Owned
267    /// by the frame because the frame already owns the query lifecycle;
268    /// lent to the row-streaming path (`execute_runtime_table_query_in`)
269    /// so chunk buffers are reused across the statement's chunk-fetches
270    /// instead of allocated fresh per chunk. Reclaimed when the frame
271    /// drops at statement end — no `thread_local!` scratch, which would
272    /// be unsound under tokio's work-stealing runtime.
273    row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
274}
275
276pub(super) struct StatementFrameGuards {
277    _tx_local_guard: TxLocalTenantGuard,
278    _config_snapshot_guard: ConfigSnapshotGuard,
279    _secret_store_guard: SecretStoreGuard,
280    _snapshot_guard: CurrentSnapshotGuard,
281}
282
283pub(super) struct PreparedStatement {
284    pub(super) expr: QueryExpr,
285    pub(super) mode: QueryMode,
286}
287
288impl StatementExecutionFrame {
289    pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
290        let conn_id = current_connection_id();
291        let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
292        let own_xids = runtime.own_transaction_xids(conn_id);
293        let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
294        let requires_index_fallback =
295            as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
296        let cache_key = result_cache_key(query);
297        let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
298        let cache_safe = runtime.result_cache_safe(conn_id);
299        // Capture identity + effective scope under the same
300        // thread-local view that the cache key was built from, so
301        // the Interface and the cache key agree on what "this
302        // statement" means.
303        let effective_scope = current_tenant();
304        let identity = current_auth_identity();
305
306        // Coarse classification of the statement, computed once from
307        // the SQL prefix so downstream callers don't re-derive it
308        // from the parsed `QueryExpr` at every privilege / lock site.
309        let required_privilege = classify_privilege(query);
310        let lock_intent = classify_lock_intent(query);
311
312        // Issue #119: resolve the visible-collections set for the
313        // active (tenant, role) scope. Only meaningful when an auth
314        // store is wired *and* an identity was captured — embedded
315        // anonymous callers fall back to `None`, and AI search call
316        // sites refuse on `None`.
317        let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
318        {
319            (Some(store), Some((principal, role))) => {
320                let collections = runtime.inner.db.store().list_collections();
321                Some(store.visible_collections_for_scope(
322                    effective_scope.as_deref(),
323                    *role,
324                    principal,
325                    &collections,
326                ))
327            }
328            _ => None,
329        };
330
331        Ok(Self {
332            tx_local_tenant,
333            snapshot,
334            own_xids,
335            cache_key,
336            is_volatile_query,
337            cache_safe,
338            effective_scope,
339            identity,
340            as_of_floor,
341            requires_index_fallback,
342            required_privilege,
343            lock_intent,
344            visible_collections,
345            row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
346        })
347    }
348
349    /// Lend the frame's per-owner row-buffer arena (#885) to the
350    /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
351    /// the owner and the arena is reclaimed when the frame drops at
352    /// statement end.
353    pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
354        Rc::clone(&self.row_arena)
355    }
356
357    pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
358        StatementFrameGuards {
359            _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
360            _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
361            _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
362            _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
363                snapshot: self.snapshot.clone(),
364                manager: Arc::clone(&runtime.inner.snapshot_manager),
365                own_xids: self.own_xids.clone(),
366                requires_index_fallback: self.requires_index_fallback,
367            }),
368        }
369    }
370
371    pub(super) fn cache_key(&self) -> &str {
372        &self.cache_key
373    }
374
375    pub(super) fn can_read_result_cache(&self) -> bool {
376        // Delegates to the `ReadFrame` Interface so the volatile +
377        // active-tx safety decision lives in exactly one place.
378        <Self as ReadFrame>::should_cache_result(self)
379    }
380
381    pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
382        // Cache-safety (volatile builtin, active-tx writes) comes from
383        // the Interface; the rest are write-side payload heuristics
384        // (statement shape, result size) that aren't part of the
385        // safety contract.
386        <Self as ReadFrame>::should_cache_result(self)
387            && result.statement_type == "select"
388            && result.engine != "vault"
389            // `QUEUE READ` is a stateful read: a delayed message
390            // (issue #722) becomes deliverable over time without a
391            // producer push to invalidate the cache, so a cached empty
392            // result would hide it. Skip caching entirely.
393            && result.statement != "queue_group_read"
394            && result.result.pre_serialized_json.is_none()
395            // Graph-analytics TVF output (issue #802) is deterministic and
396            // expensive to recompute, so it is cached at any row count. The
397            // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
398            && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
399    }
400
401    pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
402        if self.can_read_result_cache() {
403            runtime.get_result_cache_entry(self.cache_key())
404        } else {
405            None
406        }
407    }
408
409    pub(super) fn write_result_cache(
410        &self,
411        runtime: &RedDBRuntime,
412        result: &RuntimeQueryResult,
413        scopes: HashSet<String>,
414    ) {
415        if self.should_write_result_cache(result) {
416            runtime.put_result_cache_entry(
417                self.cache_key(),
418                RuntimeResultCacheEntry {
419                    result: result.clone(),
420                    cached_at: std::time::Instant::now(),
421                    scopes,
422                },
423            );
424        }
425    }
426
427    pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
428        // Detected via cheap prefix check so non-CTE queries skip the
429        // full parse here. CTE-bearing queries bypass the plan cache
430        // and result cache (rare workload — perf optimization is a
431        // follow-up). Inlining substitutes every CTE reference with
432        // its body as a subquery in FROM, after which the existing
433        // subquery-in-FROM machinery handles execution. Recursive
434        // CTEs are rejected explicitly until fixpoint execution wires
435        // through the runtime.
436        if !has_with_prefix(query) {
437            return Ok(None);
438        }
439        let parsed = crate::storage::query::parser::parse(query)
440            .map_err(|err| RedDBError::Query(err.to_string()))?;
441        if parsed.with_clause.is_some() {
442            let rewritten = crate::storage::query::executors::inline_ctes(parsed)
443                .map_err(|err| RedDBError::Query(err.to_string()))?;
444            return Ok(Some(rewritten));
445        }
446        // No WITH after parse (the prefix matched something else like
447        // `WITHIN` that already routed elsewhere) — fall through to
448        // the normal path with the original query.
449        Ok(None)
450    }
451
452    pub(super) fn prepare_statement(
453        &self,
454        runtime: &RedDBRuntime,
455        query: &str,
456    ) -> RedDBResult<PreparedStatement> {
457        let mode = detect_mode(query);
458        if matches!(mode, QueryMode::Unknown) {
459            return Err(RedDBError::Query("unable to detect query mode".to_string()));
460        }
461
462        // ── Plan cache: reuse only exact-query ASTs ──
463        //
464        // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
465        // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
466        // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
467        // Plan cache applies to statements whose shape can be
468        // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
469        // reuses the same plan across thousands of varying literals).
470        // INSERT is still bypassed — its shape changes per column set
471        // and bulk paths don't go through here anyway.
472        let first_word = query
473            .trim()
474            .split_ascii_whitespace()
475            .next()
476            .unwrap_or("")
477            .to_ascii_uppercase();
478        let is_insert = first_word == "INSERT";
479
480        // Fused normalize+extract: one byte-scan produces both the
481        // cache_key AND the literal bindings. Saves a second Lexer
482        // pass over the query text on every cache hit — dominant
483        // cost on tight UPDATE loops that hit the same shape
484        // thousands of times with varying literals.
485        let (cache_key, prescan_binds) = if is_insert {
486            (String::new(), Vec::new())
487        } else {
488            crate::storage::query::planner::cache_key::normalize_and_extract(query)
489        };
490
491        let expr = if is_insert {
492            // Bypass plan cache for INSERT — shape varies per query.
493            parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
494        } else {
495            // ── Hot path: read lock only (no writer serialization on cache hits) ──
496            //
497            // peek() is a non-mutating probe: no LRU promotion, no touch().
498            // This lets concurrent readers proceed without blocking each other.
499            // On hit we bind literals if needed and return immediately.
500            // Only on miss do we drop to a write lock to parse + insert.
501            let hit = {
502                let plan_cache = runtime.inner.query_cache.read();
503                plan_cache.peek(&cache_key).map(|cached| {
504                    let parameter_count = cached.parameter_count;
505                    let optimized = cached.plan.optimized.clone();
506                    let exact_query = cached.exact_query.clone();
507                    (parameter_count, optimized, exact_query)
508                })
509            };
510
511            if let Some((parameter_count, optimized, exact_query)) = hit {
512                if parameter_count > 0 {
513                    // Shape hit: use the binds extracted during normalise.
514                    let shape_binds = prescan_binds.clone();
515                    if let Some(bound) =
516                        crate::storage::query::planner::shape::bind_parameterized_query(
517                            &optimized,
518                            &shape_binds,
519                            parameter_count,
520                        )
521                    {
522                        bound
523                    } else if exact_query.as_deref() == Some(query) {
524                        // Bind failed but exact query matches — use as-is.
525                        optimized
526                    } else {
527                        // Bind failed and literals differ: re-parse fresh.
528                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
529                    }
530                } else {
531                    // No parameters means either there truly are no literals,
532                    // or this statement type does not participate in shape
533                    // parameterization (for example graph/queue commands).
534                    // Reusing a normalized-cache hit across a different exact
535                    // query can therefore leak stale literals into execution.
536                    if exact_query.as_deref() == Some(query) {
537                        optimized
538                    } else {
539                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
540                    }
541                }
542            } else {
543                // Cache miss — parse, parameterize, store.
544                let parsed =
545                    parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
546                let (cached_expr, parameter_count) = if let Some(prepared) =
547                    crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
548                {
549                    (prepared.shape, prepared.parameter_count)
550                } else {
551                    (parsed.clone(), 0)
552                };
553                {
554                    let mut pc = runtime.inner.query_cache.write();
555                    let plan = crate::storage::query::planner::QueryPlan::new(
556                        parsed.clone(),
557                        cached_expr,
558                        Default::default(),
559                    );
560                    pc.insert(
561                        cache_key.clone(),
562                        crate::storage::query::planner::CachedPlan::new(plan)
563                            .with_shape_key(cache_key.clone())
564                            .with_exact_query(query.to_string())
565                            .with_parameter_count(parameter_count),
566                    );
567                }
568                parsed
569            }
570        };
571
572        // Phase 5 PG parity: substitute any registered view name that
573        // appears in the expression with its stored body. Runs after
574        // parse and before dispatch so the SQL entrypoint gets the
575        // same view resolution `execute_query_expr` already does.
576        let expr = runtime.rewrite_view_refs(expr);
577
578        Ok(PreparedStatement { expr, mode })
579    }
580
581    pub(super) fn check_query_privilege(
582        &self,
583        runtime: &RedDBRuntime,
584        expr: &QueryExpr,
585    ) -> RedDBResult<()> {
586        // Frame-level coarse gate. We consult `required_privilege()`
587        // (computed once at frame-build) against the captured identity
588        // before the deep grant engine walks the parsed expression.
589        // The coarse gate cannot ALLOW anything the grant engine would
590        // deny — it only short-circuits the obvious "Role::Read tries
591        // INSERT" case so a downstream caller never has to redo this
592        // check inline. `Privilege::None` (transaction control / SET /
593        // SHOW) flows through unchanged; the grant engine treats those
594        // as bypass too.
595        if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
596            let needed = <Self as ReadFrame>::required_privilege(self);
597            if !needed.is_satisfied_by(role) {
598                // Issue #205 — when the deep grant engine *also*
599                // denies, we treat this as an ordinary permission
600                // failure. But when an Admin-only statement reaches
601                // this gate without an auth_store wired (so the deep
602                // engine can't double-check), the coarse rejection is
603                // the only line of defence — emit an OperatorEvent so
604                // the operator notices an Admin-class statement was
605                // attempted with insufficient role.
606                if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
607                    crate::telemetry::operator_event::OperatorEvent::AuthBypass {
608                        principal: username.to_string(),
609                        resource: format!("statement requiring {needed:?}"),
610                        detail: format!(
611                            "auth_store not wired; coarse gate is sole defence (role={role:?})"
612                        ),
613                    }
614                    .emit_global();
615                }
616                return Err(RedDBError::Query(format!(
617                    "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
618                )));
619            }
620        }
621        runtime
622            .check_query_privilege(expr)
623            .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
624    }
625
626    pub(super) fn prepare_dispatch(
627        &self,
628        runtime: &RedDBRuntime,
629        expr: &QueryExpr,
630    ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
631        runtime.validate_model_operations_before_auth(expr)?;
632        self.check_query_privilege(runtime, expr)?;
633        Ok(self.acquire_intent_locks(runtime, expr))
634    }
635
636    pub(super) fn acquire_intent_locks(
637        &self,
638        runtime: &RedDBRuntime,
639        expr: &QueryExpr,
640    ) -> Option<crate::runtime::locking::LockerGuard> {
641        if !runtime.config_bool("concurrency.locking.enabled", true) {
642            return None;
643        }
644        // Frame-level short-circuit: if the statement carries no lock
645        // intent (transaction control, SET, SHOW), skip the lock
646        // manager entirely instead of letting `intent_lock_modes_for`
647        // walk the parsed expression to reach the same conclusion.
648        if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
649            return None;
650        }
651        intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
652            let mut guard =
653                crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
654            let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
655            for collection in collections_referenced(expr) {
656                let _ = guard.acquire(
657                    crate::runtime::locking::Resource::Collection(collection),
658                    coll_mode,
659                );
660            }
661            guard
662        })
663    }
664}
665
666impl ReadFrame for StatementExecutionFrame {
667    fn effective_scope(&self) -> Option<&str> {
668        self.effective_scope.as_deref()
669    }
670
671    fn identity(&self) -> Option<(&str, Role)> {
672        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
673    }
674
675    fn snapshot(&self) -> &Snapshot {
676        &self.snapshot
677    }
678
679    fn as_of_floor(&self) -> Option<Xid> {
680        self.as_of_floor
681    }
682
683    fn cache_key(&self) -> &str {
684        &self.cache_key
685    }
686
687    fn should_cache_result(&self) -> bool {
688        !self.is_volatile_query && self.cache_safe
689    }
690
691    fn required_privilege(&self) -> Privilege {
692        self.required_privilege
693    }
694
695    fn lock_intent(&self) -> LockIntent {
696        self.lock_intent
697    }
698
699    fn visible_collections(&self) -> Option<&HashSet<String>> {
700        self.visible_collections.as_ref()
701    }
702}
703
704/// Lightweight `ReadFrame` carrier used by AI command entry points
705/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
706///
707/// Issue #119 calls this struct `EffectiveScope`. It bundles the
708/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
709/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
710/// instead of re-reading thread-locals at every call site.
711///
712/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
713/// from the per-statement thread-locals (identical to how
714/// `StatementExecutionFrame::build` derives them) and resolves
715/// `visible_collections` via the `AuthStore` cache.
716pub struct EffectiveScope {
717    pub(crate) tenant: Option<String>,
718    pub(crate) identity: Option<(String, Role)>,
719    pub(crate) snapshot: Snapshot,
720    pub(crate) visible_collections: Option<HashSet<String>>,
721}
722
723impl EffectiveScope {
724    /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
725    /// to gate LLM-backed NER calls behind `ai:ner:read`.
726    ///
727    /// Placeholder for now: always returns `false`. The auth engine's
728    /// capability matrix is future work; until it lands, every routed
729    /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
730    /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
731    /// Documented in code so the wire-up is a one-line change once
732    /// the auth engine learns capabilities.
733    pub fn has_capability(&self, _capability: &str) -> bool {
734        false
735    }
736}
737
738impl ReadFrame for EffectiveScope {
739    fn effective_scope(&self) -> Option<&str> {
740        self.tenant.as_deref()
741    }
742    fn identity(&self) -> Option<(&str, Role)> {
743        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
744    }
745    fn snapshot(&self) -> &Snapshot {
746        &self.snapshot
747    }
748    fn as_of_floor(&self) -> Option<Xid> {
749        None
750    }
751    fn cache_key(&self) -> &str {
752        ""
753    }
754    fn should_cache_result(&self) -> bool {
755        false
756    }
757    fn required_privilege(&self) -> Privilege {
758        Privilege::Read
759    }
760    fn lock_intent(&self) -> LockIntent {
761        LockIntent::Shared
762    }
763    fn visible_collections(&self) -> Option<&HashSet<String>> {
764        self.visible_collections.as_ref()
765    }
766}
767
768impl RedDBRuntime {
769    /// Build the AI command `EffectiveScope` from the current
770    /// statement thread-locals + auth store.
771    ///
772    /// Returns `None` for embedded callers (no auth store, no
773    /// identity) — `AuthorizedSearch` treats `None` as deny-default.
774    pub(crate) fn ai_scope(&self) -> EffectiveScope {
775        let tenant = super::impl_core::current_tenant();
776        let identity = super::impl_core::current_auth_identity();
777        let snapshot = self.current_snapshot();
778        let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
779            (Some(store), Some((principal, role))) => {
780                let collections = self.inner.db.store().list_collections();
781                Some(store.visible_collections_for_scope(
782                    tenant.as_deref(),
783                    *role,
784                    principal,
785                    &collections,
786                ))
787            }
788            _ => None,
789        };
790        EffectiveScope {
791            tenant,
792            identity,
793            snapshot,
794            visible_collections,
795        }
796    }
797}
798
799/// Test fixtures for callers that need to drive `ReadFrame` without
800/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
801/// only leaks across module boundaries inside the crate.
802#[cfg(test)]
803pub(crate) mod test_support {
804    use super::{LockIntent, Privilege, ReadFrame};
805    use crate::auth::Role;
806    use crate::storage::transaction::snapshot::{Snapshot, Xid};
807    use std::collections::HashSet;
808
809    /// A `ReadFrame` impl with hand-set fields. Used by
810    /// `authorized_search` tests to assert the deny-default and
811    /// scope-trim behaviour without going through frame construction.
812    pub(crate) struct FakeReadFrame {
813        pub tenant: Option<String>,
814        pub identity: Option<(String, Role)>,
815        pub snapshot: Snapshot,
816        pub visible: Option<HashSet<String>>,
817    }
818
819    impl FakeReadFrame {
820        pub(crate) fn without_scope() -> Self {
821            Self {
822                tenant: None,
823                identity: None,
824                snapshot: Snapshot {
825                    xid: 0,
826                    in_progress: HashSet::new(),
827                },
828                visible: None,
829            }
830        }
831
832        pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
833            Self {
834                tenant: Some("acme".to_string()),
835                identity: Some(("alice".to_string(), Role::Read)),
836                snapshot: Snapshot {
837                    xid: 0,
838                    in_progress: HashSet::new(),
839                },
840                visible: Some(visible),
841            }
842        }
843    }
844
845    impl ReadFrame for FakeReadFrame {
846        fn effective_scope(&self) -> Option<&str> {
847            self.tenant.as_deref()
848        }
849        fn identity(&self) -> Option<(&str, Role)> {
850            self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
851        }
852        fn snapshot(&self) -> &Snapshot {
853            &self.snapshot
854        }
855        fn as_of_floor(&self) -> Option<Xid> {
856            None
857        }
858        fn cache_key(&self) -> &str {
859            ""
860        }
861        fn should_cache_result(&self) -> bool {
862            false
863        }
864        fn required_privilege(&self) -> Privilege {
865            Privilege::Read
866        }
867        fn lock_intent(&self) -> LockIntent {
868            LockIntent::Shared
869        }
870        fn visible_collections(&self) -> Option<&HashSet<String>> {
871            self.visible.as_ref()
872        }
873    }
874}
875
876impl RedDBRuntime {
877    fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
878        let mut set = HashSet::new();
879        if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
880            set.insert(ctx.xid);
881            for (_, sub) in &ctx.savepoints {
882                set.insert(*sub);
883            }
884            for sub in &ctx.released_sub_xids {
885                set.insert(*sub);
886            }
887        }
888        set
889    }
890
891    /// Resolve the snapshot for the current statement, returning
892    /// the snapshot itself and (when AS OF is in effect) the
893    /// resolved xid floor. The floor is the same xid carried inside
894    /// `Snapshot.xid` for AS OF reads — exposing it separately lets
895    /// the `ReadFrame` Interface tell "live read" from "historical
896    /// read" without inferring from `in_progress.is_empty()`.
897    fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
898        match peek_top_level_as_of_with_table(query) {
899            Some((spec, Some(table))) => {
900                if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
901                    return Err(RedDBError::InvalidConfig(format!(
902                        "AS OF requires a versioned collection — \
903                         `{table}` has not opted in. \
904                         Call vcs.set_versioned(\"{table}\", true) first."
905                    )));
906                }
907                let xid = self.vcs_resolve_as_of(spec)?;
908                Ok((
909                    Snapshot {
910                        xid,
911                        in_progress: HashSet::new(),
912                    },
913                    Some(xid),
914                ))
915            }
916            Some((spec, None)) => {
917                let xid = self.vcs_resolve_as_of(spec)?;
918                Ok((
919                    Snapshot {
920                        xid,
921                        in_progress: HashSet::new(),
922                    },
923                    Some(xid),
924                ))
925            }
926            None => Ok((self.current_snapshot(), None)),
927        }
928    }
929
930    fn result_cache_safe(&self, conn_id: u64) -> bool {
931        let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
932        let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
933        !has_active_xids && !in_own_tx
934    }
935}
936
937/// Whether a result's `engine` tag is one of the graph-analytics TVF
938/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
939/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
940/// algorithm output that is cached regardless of row count.
941fn is_graph_tvf_engine(engine: &str) -> bool {
942    matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
943}
944
945fn result_cache_key(query: &str) -> String {
946    let tenant = current_tenant().unwrap_or_default();
947    let auth = current_auth_identity()
948        .map(|(user, role)| format!("{}|{:?}", user, role))
949        .unwrap_or_default();
950    if tenant.is_empty() && auth.is_empty() {
951        query.to_string()
952    } else {
953        format!("{query}\u{001e}{tenant}\u{001e}{auth}")
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960    use crate::api::RedDBOptions;
961    use crate::runtime::impl_core::{
962        clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
963        set_current_tenant,
964    };
965    use crate::runtime::RedDBRuntime;
966
967    fn fresh_runtime() -> RedDBRuntime {
968        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
969    }
970
971    /// Ensure thread-local state from a prior test can't leak into
972    /// the next one — tests in the same binary share the thread.
973    fn reset_thread_locals() {
974        clear_current_tenant();
975        clear_current_auth_identity();
976    }
977
978    #[test]
979    fn autocommit_select_takes_live_snapshot() {
980        reset_thread_locals();
981        let rt = fresh_runtime();
982        let frame =
983            StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
984
985        // Live reads: no AS OF floor, snapshot bounded by the
986        // manager's `peek_next_xid` so committed tuples are visible.
987        let f: &dyn ReadFrame = &frame;
988        assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
989        assert!(
990            f.snapshot().xid >= 1,
991            "autocommit snapshot xid is bounded by peek_next_xid"
992        );
993    }
994
995    #[test]
996    fn frame_captures_identity_and_scope() {
997        reset_thread_locals();
998        set_current_tenant("acme".to_string());
999        set_current_auth_identity("alice".to_string(), Role::Write);
1000
1001        let rt = fresh_runtime();
1002        let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1003        let f: &dyn ReadFrame = &frame;
1004
1005        assert_eq!(f.effective_scope(), Some("acme"));
1006        let id = f.identity().expect("identity captured");
1007        assert_eq!(id.0, "alice");
1008        assert!(matches!(id.1, Role::Write));
1009
1010        // Cache key mixes scope + identity so two callers under
1011        // different tenants never share a cache slot.
1012        assert!(
1013            f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1014            "cache key folds in scope + identity, got {:?}",
1015            f.cache_key()
1016        );
1017
1018        reset_thread_locals();
1019    }
1020
1021    #[test]
1022    fn as_of_rejects_non_versioned_user_collection() {
1023        reset_thread_locals();
1024        let rt = fresh_runtime();
1025
1026        // `not_versioned` is a plain user collection — the frame
1027        // builder must reject AS OF until the caller opts in via
1028        // `vcs.set_versioned`.
1029        let err = match StatementExecutionFrame::build(
1030            &rt,
1031            "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1032        ) {
1033            Err(e) => e,
1034            Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1035        };
1036
1037        let msg = format!("{err}");
1038        assert!(
1039            msg.contains("AS OF requires a versioned collection"),
1040            "expected AS OF rejection, got: {msg}"
1041        );
1042    }
1043
1044    /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1045    ///
1046    /// Sets a tenant + identity via the public thread-local API the
1047    /// runtime uses for ambient scope, drives a real `SELECT` through
1048    /// `execute_query`, then inspects the result cache that the SELECT
1049    /// path populates via `frame.cache_key()`. The key only carries
1050    /// the tenant + identity *because* it was built through the frame —
1051    /// reverting the wiring to inline `current_tenant()` /
1052    /// `current_auth_identity()` reads would still pass this test, but
1053    /// dropping the frame entirely (so the SELECT path stopped touching
1054    /// `cache_key`) would break it.
1055    #[test]
1056    fn select_path_routes_through_frame_cache_key() {
1057        reset_thread_locals();
1058        set_current_tenant("acme".to_string());
1059        set_current_auth_identity("alice".to_string(), Role::Read);
1060
1061        let rt = fresh_runtime();
1062        let result = rt
1063            .execute_query("SELECT 1")
1064            .expect("SELECT 1 executes under tenant=acme/identity=alice");
1065        assert_eq!(result.statement_type, "select");
1066
1067        // The SELECT path (in `execute_query_expr`) builds a frame and
1068        // writes its result through `frame.cache_key()`. That key folds
1069        // tenant + identity in via `result_cache_key`, so finding "acme"
1070        // and "alice" inside any cached key proves the frame was the
1071        // seam used.
1072        let cache = rt.inner.result_cache.read();
1073        let any_keyed_with_scope = cache
1074            .0
1075            .keys()
1076            .any(|k| k.contains("acme") && k.contains("alice"));
1077        assert!(
1078            any_keyed_with_scope,
1079            "expected at least one result-cache key carrying tenant+identity, \
1080             got keys: {:?}",
1081            cache.0.keys().collect::<Vec<_>>()
1082        );
1083
1084        reset_thread_locals();
1085    }
1086
1087    /// A SELECT that calls a volatile builtin (here:
1088    /// `pg_advisory_unlock`, the volatile token the runtime currently
1089    /// recognises in `query_has_volatile_builtin`) must NOT populate
1090    /// the result cache. Any caller hitting the cache after this would
1091    /// see a stale answer for an inherently-volatile query, so the
1092    /// SELECT path gates writes through `frame.should_cache_result()`.
1093    ///
1094    /// Deletion test: removing `ReadFrame::should_cache_result`, or
1095    /// reverting the SELECT path to skip its safety gate, would let
1096    /// the result cache silently absorb this statement and break the
1097    /// assertion below.
1098    #[test]
1099    fn volatile_select_does_not_populate_result_cache() {
1100        reset_thread_locals();
1101        let rt = fresh_runtime();
1102
1103        // Frame-level invariant: the volatile-builtin signal collapses
1104        // `should_cache_result` to false even for an autocommit /
1105        // out-of-tx connection.
1106        let frame =
1107            StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1108        let f: &dyn ReadFrame = &frame;
1109        assert!(
1110            !f.should_cache_result(),
1111            "volatile builtin must disable result-cache safety"
1112        );
1113
1114        // End-to-end: drive the volatile SELECT through `execute_query`
1115        // and confirm no entry was stamped under its cache key. Other
1116        // entries from prior tests sharing the binary may exist, so we
1117        // assert specifically on this query's key.
1118        let _ = rt
1119            .execute_query("SELECT pg_advisory_unlock(1)")
1120            .expect("volatile SELECT executes");
1121        let cache = rt.inner.result_cache.read();
1122        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1123        assert!(
1124            !cache.0.contains_key(&key),
1125            "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1126            cache.0.keys().collect::<Vec<_>>()
1127        );
1128
1129        reset_thread_locals();
1130    }
1131
1132    #[test]
1133    fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1134        reset_thread_locals();
1135        let rt = fresh_runtime();
1136        rt.inner
1137            .db
1138            .store()
1139            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1140
1141        let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1142        assert_eq!(result.statement_type, "select");
1143
1144        let key = result_cache_key("SELECT 1");
1145        assert!(
1146            rt.inner
1147                .result_blob_cache
1148                .get("runtime.result_cache", &key)
1149                .is_some(),
1150            "blob backend should stamp the Blob Cache path"
1151        );
1152        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1153        assert!(
1154            !rt.inner.result_cache.read().0.contains_key(&key),
1155            "blob backend should not write the legacy map"
1156        );
1157    }
1158
1159    #[test]
1160    fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1161        reset_thread_locals();
1162        let rt = fresh_runtime();
1163        rt.inner
1164            .db
1165            .store()
1166            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1167
1168        let _ = rt
1169            .execute_query("SELECT pg_advisory_unlock(1)")
1170            .expect("volatile SELECT executes");
1171        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1172        assert!(
1173            rt.inner
1174                .result_blob_cache
1175                .get("runtime.result_cache", &key)
1176                .is_none(),
1177            "volatile SELECT must not populate blob result cache"
1178        );
1179        assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1180    }
1181
1182    #[test]
1183    fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1184        reset_thread_locals();
1185        let rt = fresh_runtime();
1186        rt.inner
1187            .db
1188            .store()
1189            .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1190
1191        let first = rt.execute_query("SELECT 1").expect("first SELECT");
1192        let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1193        assert_eq!(first.result.len(), second.result.len());
1194
1195        let key = result_cache_key("SELECT 1");
1196        assert!(rt.inner.result_cache.read().0.contains_key(&key));
1197        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1198        assert_eq!(rt.result_cache_shadow_divergences(), 0);
1199        assert_eq!(
1200            crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1201            "cache_shadow_divergence_total"
1202        );
1203    }
1204
1205    #[test]
1206    fn as_of_on_red_collection_records_floor() {
1207        reset_thread_locals();
1208        let rt = fresh_runtime();
1209
1210        // `red_*` collections always allow AS OF. The frame should
1211        // resolve to a concrete xid and surface it via the Interface.
1212        let frame =
1213            StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1214                .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1215
1216        let f: &dyn ReadFrame = &frame;
1217        assert_eq!(
1218            f.as_of_floor(),
1219            Some(1),
1220            "AS OF SNAPSHOT 1 records xid=1 as the floor"
1221        );
1222        assert_eq!(f.snapshot().xid, 1);
1223        assert!(
1224            f.snapshot().in_progress.is_empty(),
1225            "historical reads have no in-progress set"
1226        );
1227    }
1228
1229    /// The frame classifies common SQL prefixes into the coarse
1230    /// `Privilege` / `LockIntent` buckets at build time. This test
1231    /// pins the mapping so a regression that silently re-routes
1232    /// (e.g. INSERT classified as Read) surfaces here, not at a
1233    /// downstream privilege gate.
1234    #[test]
1235    fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1236        reset_thread_locals();
1237        let rt = fresh_runtime();
1238
1239        let cases = [
1240            ("SELECT 1", Privilege::Read, LockIntent::Shared),
1241            (
1242                "INSERT INTO t (id) VALUES (1)",
1243                Privilege::Write,
1244                LockIntent::Exclusive,
1245            ),
1246            (
1247                "UPDATE t SET x = 1 WHERE id = 1",
1248                Privilege::Write,
1249                LockIntent::Exclusive,
1250            ),
1251            (
1252                "DELETE FROM t WHERE id = 1",
1253                Privilege::Write,
1254                LockIntent::Exclusive,
1255            ),
1256            (
1257                "CREATE TABLE foo (id INT)",
1258                Privilege::Write,
1259                LockIntent::Exclusive,
1260            ),
1261            ("BEGIN", Privilege::None, LockIntent::None),
1262            ("COMMIT", Privilege::None, LockIntent::None),
1263            ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1264        ];
1265
1266        for (q, want_priv, want_lock) in cases {
1267            let frame = StatementExecutionFrame::build(&rt, q)
1268                .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1269            let f: &dyn ReadFrame = &frame;
1270            assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1271            assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1272        }
1273    }
1274
1275    /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1276    /// driven through `execute_query` under an identity whose role
1277    /// doesn't satisfy the frame's coarse `Read` privilege gets
1278    /// denied with the frame's signal.
1279    ///
1280    /// We test the gate by classifying an INSERT (which the frame
1281    /// reports as `Privilege::Write`) under `Role::Read` — the only
1282    /// pair the legacy fallback would also reject, but here the
1283    /// rejection comes through `frame.check_query_privilege` BEFORE
1284    /// the parsed-expression walker runs. Removing
1285    /// `required_privilege` (or the `is_satisfied_by` consult inside
1286    /// `check_query_privilege`) would force the deny path back to the
1287    /// inline `RedDBRuntime::check_query_privilege` walker — but the
1288    /// auth_store gate up there is bypassed when no auth_store is
1289    /// wired (embedded test mode), so this test would FLIP from
1290    /// denied to permitted and break the assertion below.
1291    #[test]
1292    fn insert_under_read_role_denied_via_frame_privilege() {
1293        reset_thread_locals();
1294        set_current_auth_identity("alice".to_string(), Role::Read);
1295
1296        let rt = fresh_runtime();
1297        // Bypass parser by reaching into the frame directly: the
1298        // frame derives privilege from the SQL prefix without
1299        // needing an auth_store wired up. Driving end-to-end via
1300        // `execute_query` would also reject (no table `t`), but for
1301        // a different reason — we want to pin the privilege seam.
1302        let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1303            .expect("frame builds for INSERT");
1304        let f: &dyn ReadFrame = &frame;
1305        assert_eq!(
1306            f.required_privilege(),
1307            Privilege::Write,
1308            "INSERT classified as Write"
1309        );
1310        let id = f.identity().expect("identity captured");
1311        assert!(
1312            !f.required_privilege().is_satisfied_by(id.1),
1313            "Role::Read does not satisfy Privilege::Write — frame must deny"
1314        );
1315
1316        // End-to-end: the frame's `check_query_privilege` sees the
1317        // (Read role, Write privilege) mismatch and denies before
1318        // dispatch. We drive a synthetic `QueryExpr::Table` because
1319        // the SELECT/INSERT parser would happen to also fail, and we
1320        // want the failure to come from the privilege seam.
1321        use crate::storage::query::ast::{QueryExpr, TableQuery};
1322        let expr = QueryExpr::Table(TableQuery::new("t"));
1323        let err = frame
1324            .check_query_privilege(&rt, &expr)
1325            .expect_err("denied via frame's coarse privilege gate");
1326        let msg = format!("{err}");
1327        assert!(
1328            msg.contains("permission denied") && msg.contains("Write"),
1329            "expected frame-level Write deny, got: {msg}"
1330        );
1331
1332        reset_thread_locals();
1333    }
1334
1335    /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1336    /// wired into the SELECT path and produces observable results
1337    /// byte-identical to the per-request-allocation baseline.
1338    ///
1339    /// A table with more rows than the streaming high-water mark
1340    /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1341    /// path to assemble many chunks, each leasing/recycling the frame
1342    /// arena's single chunk buffer. Driving it through `execute_query`
1343    /// (which builds a `StatementExecutionFrame` and lends its arena)
1344    /// must return every inserted row, in order — exactly what the
1345    /// allocate-per-chunk path returned. A bug in the arena wiring
1346    /// (dropped rows, bled rows, mis-ordering) would surface here.
1347    #[test]
1348    fn large_select_through_frame_arena_returns_all_rows_in_order() {
1349        reset_thread_locals();
1350        let rt = fresh_runtime();
1351        rt.execute_query("CREATE TABLE big (id INT)")
1352            .expect("create table");
1353
1354        // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1355        // spans multiple chunks and the arena buffer is reused.
1356        const N: usize = 2_500;
1357        let values = (0..N)
1358            .map(|i| format!("({i})"))
1359            .collect::<Vec<_>>()
1360            .join(", ");
1361        rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1362            .expect("insert rows");
1363
1364        let result = rt
1365            .execute_query("SELECT id FROM big ORDER BY id")
1366            .expect("large SELECT executes through the frame arena path");
1367        assert_eq!(result.statement_type, "select");
1368        assert_eq!(
1369            result.result.records.len(),
1370            N,
1371            "every inserted row streams back through the arena-backed channel"
1372        );
1373        for (i, record) in result.result.records.iter().enumerate() {
1374            assert_eq!(
1375                record.get("id"),
1376                Some(&crate::storage::schema::Value::Integer(i as i64)),
1377                "row {i} is byte-identical to the per-request-allocation baseline"
1378            );
1379        }
1380
1381        reset_thread_locals();
1382    }
1383
1384    /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1385    /// control statement carries `LockIntent::None` and the
1386    /// `acquire_intent_locks` path returns `None` without consulting
1387    /// `intent_lock_modes_for`. Removing the method (or its consult
1388    /// site in `acquire_intent_locks`) would force the lock-mode
1389    /// helper to walk a fabricated parsed expression to reach the
1390    /// same conclusion — but the assertion that no guard is allocated
1391    /// for a `BEGIN` frame would still hold, so we additionally pin
1392    /// the classifier mapping above to make the deletion observable.
1393    #[test]
1394    fn control_statement_skips_intent_locks_via_frame() {
1395        reset_thread_locals();
1396        let rt = fresh_runtime();
1397
1398        let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1399        let f: &dyn ReadFrame = &frame;
1400        assert_eq!(f.lock_intent(), LockIntent::None);
1401
1402        // Drive `acquire_intent_locks` against a fabricated SELECT
1403        // expression that WOULD normally yield `(IS, IS)`; the frame's
1404        // `lock_intent() == None` short-circuit must still suppress
1405        // the guard.
1406        use crate::storage::query::ast::{QueryExpr, TableQuery};
1407        let expr = QueryExpr::Table(TableQuery::new("t"));
1408        let guard = frame.acquire_intent_locks(&rt, &expr);
1409        assert!(
1410            guard.is_none(),
1411            "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1412        );
1413    }
1414}