Skip to main content

reddb_server/runtime/
statement_frame.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use super::impl_core::{
5    collections_referenced, current_auth_identity, current_connection_id, current_tenant,
6    has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
7    query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
8    SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
9};
10use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
11use crate::api::{RedDBError, RedDBResult};
12use crate::auth::Role;
13use crate::storage::query::ast::QueryExpr;
14use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
15use crate::storage::transaction::snapshot::{Snapshot, Xid};
16
17/// Coarse privilege classification for a statement, computed once at
18/// frame-build time from the SQL text. Mirrors the three-role auth
19/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
20/// answer "can this identity run this statement?" without re-walking
21/// the parsed `QueryExpr` at every call site.
22///
23/// `None` means the statement does not touch the privilege gate at
24/// all (transaction control, SET, SHOW). Such statements must remain
25/// runnable under any authenticated identity.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub(crate) enum Privilege {
28    /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
29    /// any role from `Role::Read` upward.
30    Read,
31    /// Mutation of user data or schema author DDL (INSERT, UPDATE,
32    /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
33    /// at least `Role::Write`.
34    Write,
35    /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
36    /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
37    Admin,
38    /// Statement does not consult the privilege gate (BEGIN, COMMIT,
39    /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
40    /// for any authenticated identity.
41    None,
42}
43
44impl Privilege {
45    /// `true` iff `role` is sufficient to execute a statement carrying
46    /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
47    /// Admin` containment used by the auth fallback path.
48    pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
49        match self {
50            Self::None => true,
51            Self::Read => role.can_read(),
52            Self::Write => role.can_write(),
53            Self::Admin => role.can_admin(),
54        }
55    }
56}
57
58/// Coarse lock intent for a statement, computed once at frame-build
59/// time. Maps onto the storage-layer's `LockMode` matrix downstream
60/// but stays decoupled here so the runtime can answer "does this
61/// statement need the lock manager at all?" without a `use storage::`
62/// at every call site.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub(crate) enum LockIntent {
65    /// No collection-level lock needed (transaction control, SET,
66    /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
67    None,
68    /// Reader-style intent: SELECT, joins, graph / queue / search
69    /// reads. Maps to `(IS, IS)` at the storage layer.
70    Shared,
71    /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
72    /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
73    /// `Exclusive` at this granularity — call sites that need the
74    /// finer distinction still consult `intent_lock_modes_for`.
75    Exclusive,
76}
77
78/// Small, stable Interface that *represents* a read statement's
79/// execution context. Every read caller that needs to know "under
80/// what scope / identity / snapshot am I running, and is there an
81/// AS OF floor in effect?" consults this trait — never the
82/// underlying thread-locals or runtime fields directly.
83///
84/// The deletion test: removing this trait would force the four
85/// concerns it exposes back into ad-hoc lookups at every read
86/// callsite (`current_tenant()`, `current_auth_identity()`,
87/// `capture_current_snapshot()`, AS OF re-parsing). The trait
88/// concentrates them in one place so future changes (per-statement
89/// logging, audit, scope policy) have a single seam to extend.
90pub(crate) trait ReadFrame {
91    /// Effective tenant scope for the statement after WITHIN /
92    /// SET LOCAL TENANT / SET TENANT resolution. `None` means
93    /// "no tenant bound" (RLS deny-default applies).
94    fn effective_scope(&self) -> Option<&str>;
95
96    /// Authenticated identity observed at frame-build time, if any.
97    /// Returns `(username, role)` so callers can render audit lines
98    /// or feed RLS policy lookups without re-reading thread-locals.
99    fn identity(&self) -> Option<(&str, Role)>;
100
101    /// MVCC snapshot the statement reads against. For autocommit
102    /// this is a fresh snapshot; inside an active transaction it
103    /// is the txn's snapshot; under AS OF it is the resolved
104    /// historical xid.
105    fn snapshot(&self) -> &Snapshot;
106
107    /// AS OF xid floor when AS OF was applied for this statement,
108    /// `None` for live reads. Useful for downstream callers that
109    /// want to gate behaviour on historical-read mode without
110    /// re-parsing the query.
111    fn as_of_floor(&self) -> Option<Xid>;
112
113    /// Stable result-cache key for the statement (already mixes
114    /// effective tenant + identity).
115    fn cache_key(&self) -> &str;
116
117    /// Whether the statement is safe to serve from / populate the
118    /// result cache. Combines two underlying signals:
119    ///
120    ///   * the query does not call a volatile builtin (e.g. `NOW()`,
121    ///     `RANDOM()`, `UUID()`), which would change between calls,
122    ///   * the connection is not inside an active transaction with
123    ///     uncommitted writes that other readers shouldn't observe.
124    ///
125    /// SELECT cache callsites (read + write) consult this method
126    /// instead of re-deriving safety from globals or poking the
127    /// frame's private fields. Removing it would force every cache
128    /// callsite to re-run `query_has_volatile_builtin` plus
129    /// `result_cache_safe(conn_id)` inline.
130    fn should_cache_result(&self) -> bool;
131
132    /// Coarse privilege class the statement requires, computed once
133    /// at frame-build time from the SQL prefix. Read/write dispatch
134    /// sites consult this instead of re-classifying the parsed
135    /// `QueryExpr` inline at every callsite.
136    ///
137    /// Removing this method would force every privilege gate to
138    /// recompute the (action, resource) classification from the
139    /// parsed expression and re-check the role hierarchy inline.
140    fn required_privilege(&self) -> Privilege;
141
142    /// Coarse collection-level lock intent the statement implies.
143    /// `None` lets the lock-acquisition path short-circuit without
144    /// touching the lock manager.
145    ///
146    /// Removing this method would force the lock-acquisition path
147    /// to always invoke `intent_lock_modes_for` (which itself walks
148    /// the parsed expression) even for transaction-control / SET /
149    /// SHOW statements that need no collection lock at all.
150    fn lock_intent(&self) -> LockIntent;
151
152    /// Set of collection ids the calling identity is allowed to
153    /// observe under the active `(tenant, role)` scope. Computed once
154    /// at frame-build time via the `AuthStore` visible-collections
155    /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
156    /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
157    /// before any similarity score is computed (issue #119).
158    ///
159    /// `None` means the frame was built without an auth store wired —
160    /// embedded / single-tenant tests run that way. AI search call
161    /// sites refuse to proceed with `None`, which is the deny-default
162    /// the issue requires; pure SELECT paths fall back to the existing
163    /// per-row RLS gate.
164    fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
165}
166
167/// Cheap first-word classification of a SQL statement, used at
168/// frame-build time to derive `Privilege` + `LockIntent` without
169/// re-parsing the query. Matches the keywords that the legacy
170/// inline checks in `RedDBRuntime::check_query_privilege` and
171/// `intent_lock_modes_for` already key on.
172fn statement_kind(query: &str) -> &'static str {
173    let trimmed = query.trim_start();
174    // Skip a leading line / block comment so the classifier doesn't
175    // misread `/* ... */ SELECT ...` as an unknown statement.
176    let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
177        rest.split_once('\n')
178            .map(|(_, r)| r)
179            .unwrap_or("")
180            .trim_start()
181    } else {
182        trimmed
183    };
184    let first = trimmed
185        .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
186        .next()
187        .unwrap_or("");
188    // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
189    let mut buf = [0u8; 16];
190    let bytes = first.as_bytes();
191    let n = bytes.len().min(buf.len());
192    for i in 0..n {
193        buf[i] = bytes[i].to_ascii_uppercase();
194    }
195    match &buf[..n] {
196        b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" => "read",
197        b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
198        b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
199        b"GRANT" | b"REVOKE" => "admin",
200        b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
201        | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
202        _ => "unknown",
203    }
204}
205
206fn classify_privilege(query: &str) -> Privilege {
207    match statement_kind(query) {
208        "read" => Privilege::Read,
209        "write" => Privilege::Write,
210        // DDL is gated at `Role::Write` in the legacy fallback (see
211        // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
212        // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
213        // GRANT / REVOKE upgrade to Admin via finer checks at the call
214        // site — the frame surfaces only the coarse class.
215        "ddl" => Privilege::Write,
216        "admin" => Privilege::Admin,
217        _ => Privilege::None,
218    }
219}
220
221fn classify_lock_intent(query: &str) -> LockIntent {
222    match statement_kind(query) {
223        "read" => LockIntent::Shared,
224        "write" | "ddl" => LockIntent::Exclusive,
225        _ => LockIntent::None,
226    }
227}
228
229pub(super) struct StatementExecutionFrame {
230    tx_local_tenant: Option<Option<String>>,
231    snapshot: Snapshot,
232    own_xids: HashSet<Xid>,
233    cache_key: String,
234    is_volatile_query: bool,
235    cache_safe: bool,
236    /// Effective tenant captured at frame-build time after WITHIN /
237    /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
238    /// so the `ReadFrame` Interface can return a borrow without
239    /// re-touching the thread-local stack.
240    effective_scope: Option<String>,
241    /// Auth identity captured at frame-build time. `None` for
242    /// embedded / anonymous callers.
243    identity: Option<(String, Role)>,
244    /// `Some(xid)` when AS OF resolved to a historical xid; `None`
245    /// for live reads.
246    as_of_floor: Option<Xid>,
247    /// True when the statement snapshot can require tuple versions that
248    /// current secondary indexes no longer contain.
249    requires_index_fallback: bool,
250    /// Privilege class required by the statement, derived from the
251    /// SQL text at frame-build time. Read/write dispatch sites
252    /// consult this instead of re-classifying the parsed expression.
253    required_privilege: Privilege,
254    /// Collection-level lock intent the statement implies. The
255    /// lock-acquisition path short-circuits when this is `None`.
256    lock_intent: LockIntent,
257    /// Set of collection ids the active `(tenant, role)` scope is
258    /// allowed to observe. Computed at frame-build time via the
259    /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
260    /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
261    /// scoring (issue #119). `None` when no auth store is wired
262    /// (embedded test mode) — AI search refuses on `None`.
263    visible_collections: Option<HashSet<String>>,
264}
265
266pub(super) struct StatementFrameGuards {
267    _tx_local_guard: TxLocalTenantGuard,
268    _config_snapshot_guard: ConfigSnapshotGuard,
269    _secret_store_guard: SecretStoreGuard,
270    _snapshot_guard: CurrentSnapshotGuard,
271}
272
273pub(super) struct PreparedStatement {
274    pub(super) expr: QueryExpr,
275    pub(super) mode: QueryMode,
276}
277
278impl StatementExecutionFrame {
279    pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
280        let conn_id = current_connection_id();
281        let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
282        let own_xids = runtime.own_transaction_xids(conn_id);
283        let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
284        let requires_index_fallback =
285            as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
286        let cache_key = result_cache_key(query);
287        let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
288        let cache_safe = runtime.result_cache_safe(conn_id);
289        // Capture identity + effective scope under the same
290        // thread-local view that the cache key was built from, so
291        // the Interface and the cache key agree on what "this
292        // statement" means.
293        let effective_scope = current_tenant();
294        let identity = current_auth_identity();
295
296        // Coarse classification of the statement, computed once from
297        // the SQL prefix so downstream callers don't re-derive it
298        // from the parsed `QueryExpr` at every privilege / lock site.
299        let required_privilege = classify_privilege(query);
300        let lock_intent = classify_lock_intent(query);
301
302        // Issue #119: resolve the visible-collections set for the
303        // active (tenant, role) scope. Only meaningful when an auth
304        // store is wired *and* an identity was captured — embedded
305        // anonymous callers fall back to `None`, and AI search call
306        // sites refuse on `None`.
307        let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
308        {
309            (Some(store), Some((principal, role))) => {
310                let collections = runtime.inner.db.store().list_collections();
311                Some(store.visible_collections_for_scope(
312                    effective_scope.as_deref(),
313                    *role,
314                    principal,
315                    &collections,
316                ))
317            }
318            _ => None,
319        };
320
321        Ok(Self {
322            tx_local_tenant,
323            snapshot,
324            own_xids,
325            cache_key,
326            is_volatile_query,
327            cache_safe,
328            effective_scope,
329            identity,
330            as_of_floor,
331            requires_index_fallback,
332            required_privilege,
333            lock_intent,
334            visible_collections,
335        })
336    }
337
338    pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
339        StatementFrameGuards {
340            _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
341            _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
342            _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
343            _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
344                snapshot: self.snapshot.clone(),
345                manager: Arc::clone(&runtime.inner.snapshot_manager),
346                own_xids: self.own_xids.clone(),
347                requires_index_fallback: self.requires_index_fallback,
348            }),
349        }
350    }
351
352    pub(super) fn cache_key(&self) -> &str {
353        &self.cache_key
354    }
355
356    pub(super) fn can_read_result_cache(&self) -> bool {
357        // Delegates to the `ReadFrame` Interface so the volatile +
358        // active-tx safety decision lives in exactly one place.
359        <Self as ReadFrame>::should_cache_result(self)
360    }
361
362    pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
363        // Cache-safety (volatile builtin, active-tx writes) comes from
364        // the Interface; the rest are write-side payload heuristics
365        // (statement shape, result size) that aren't part of the
366        // safety contract.
367        <Self as ReadFrame>::should_cache_result(self)
368            && result.statement_type == "select"
369            && result.engine != "vault"
370            && result.result.pre_serialized_json.is_none()
371            && result.result.records.len() <= 5
372    }
373
374    pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
375        if self.can_read_result_cache() {
376            runtime.get_result_cache_entry(self.cache_key())
377        } else {
378            None
379        }
380    }
381
382    pub(super) fn write_result_cache(
383        &self,
384        runtime: &RedDBRuntime,
385        result: &RuntimeQueryResult,
386        scopes: HashSet<String>,
387    ) {
388        if self.should_write_result_cache(result) {
389            runtime.put_result_cache_entry(
390                self.cache_key(),
391                RuntimeResultCacheEntry {
392                    result: result.clone(),
393                    cached_at: std::time::Instant::now(),
394                    scopes,
395                },
396            );
397        }
398    }
399
400    pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
401        // Detected via cheap prefix check so non-CTE queries skip the
402        // full parse here. CTE-bearing queries bypass the plan cache
403        // and result cache (rare workload — perf optimization is a
404        // follow-up). Inlining substitutes every CTE reference with
405        // its body as a subquery in FROM, after which the existing
406        // subquery-in-FROM machinery handles execution. Recursive
407        // CTEs are rejected explicitly until fixpoint execution wires
408        // through the runtime.
409        if !has_with_prefix(query) {
410            return Ok(None);
411        }
412        let parsed = crate::storage::query::parser::parse(query)
413            .map_err(|err| RedDBError::Query(err.to_string()))?;
414        if parsed.with_clause.is_some() {
415            let rewritten = crate::storage::query::executors::inline_ctes(parsed)
416                .map_err(|err| RedDBError::Query(err.to_string()))?;
417            return Ok(Some(rewritten));
418        }
419        // No WITH after parse (the prefix matched something else like
420        // `WITHIN` that already routed elsewhere) — fall through to
421        // the normal path with the original query.
422        Ok(None)
423    }
424
425    pub(super) fn prepare_statement(
426        &self,
427        runtime: &RedDBRuntime,
428        query: &str,
429    ) -> RedDBResult<PreparedStatement> {
430        let mode = detect_mode(query);
431        if matches!(mode, QueryMode::Unknown) {
432            return Err(RedDBError::Query("unable to detect query mode".to_string()));
433        }
434
435        // ── Plan cache: reuse only exact-query ASTs ──
436        //
437        // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
438        // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
439        // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
440        // Plan cache applies to statements whose shape can be
441        // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
442        // reuses the same plan across thousands of varying literals).
443        // INSERT is still bypassed — its shape changes per column set
444        // and bulk paths don't go through here anyway.
445        let first_word = query
446            .trim()
447            .split_ascii_whitespace()
448            .next()
449            .unwrap_or("")
450            .to_ascii_uppercase();
451        let is_insert = first_word == "INSERT";
452
453        // Fused normalize+extract: one byte-scan produces both the
454        // cache_key AND the literal bindings. Saves a second Lexer
455        // pass over the query text on every cache hit — dominant
456        // cost on tight UPDATE loops that hit the same shape
457        // thousands of times with varying literals.
458        let (cache_key, prescan_binds) = if is_insert {
459            (String::new(), Vec::new())
460        } else {
461            crate::storage::query::planner::cache_key::normalize_and_extract(query)
462        };
463
464        let expr = if is_insert {
465            // Bypass plan cache for INSERT — shape varies per query.
466            parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
467        } else {
468            // ── Hot path: read lock only (no writer serialization on cache hits) ──
469            //
470            // peek() is a non-mutating probe: no LRU promotion, no touch().
471            // This lets concurrent readers proceed without blocking each other.
472            // On hit we bind literals if needed and return immediately.
473            // Only on miss do we drop to a write lock to parse + insert.
474            let hit = {
475                let plan_cache = runtime.inner.query_cache.read();
476                plan_cache.peek(&cache_key).map(|cached| {
477                    let parameter_count = cached.parameter_count;
478                    let optimized = cached.plan.optimized.clone();
479                    let exact_query = cached.exact_query.clone();
480                    (parameter_count, optimized, exact_query)
481                })
482            };
483
484            if let Some((parameter_count, optimized, exact_query)) = hit {
485                if parameter_count > 0 {
486                    // Shape hit: use the binds extracted during normalise.
487                    let shape_binds = prescan_binds.clone();
488                    if let Some(bound) =
489                        crate::storage::query::planner::shape::bind_parameterized_query(
490                            &optimized,
491                            &shape_binds,
492                            parameter_count,
493                        )
494                    {
495                        bound
496                    } else if exact_query.as_deref() == Some(query) {
497                        // Bind failed but exact query matches — use as-is.
498                        optimized
499                    } else {
500                        // Bind failed and literals differ: re-parse fresh.
501                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
502                    }
503                } else {
504                    // No parameters means either there truly are no literals,
505                    // or this statement type does not participate in shape
506                    // parameterization (for example graph/queue commands).
507                    // Reusing a normalized-cache hit across a different exact
508                    // query can therefore leak stale literals into execution.
509                    if exact_query.as_deref() == Some(query) {
510                        optimized
511                    } else {
512                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
513                    }
514                }
515            } else {
516                // Cache miss — parse, parameterize, store.
517                let parsed =
518                    parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
519                let (cached_expr, parameter_count) = if let Some(prepared) =
520                    crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
521                {
522                    (prepared.shape, prepared.parameter_count)
523                } else {
524                    (parsed.clone(), 0)
525                };
526                {
527                    let mut pc = runtime.inner.query_cache.write();
528                    let plan = crate::storage::query::planner::QueryPlan::new(
529                        parsed.clone(),
530                        cached_expr,
531                        Default::default(),
532                    );
533                    pc.insert(
534                        cache_key.clone(),
535                        crate::storage::query::planner::CachedPlan::new(plan)
536                            .with_shape_key(cache_key.clone())
537                            .with_exact_query(query.to_string())
538                            .with_parameter_count(parameter_count),
539                    );
540                }
541                parsed
542            }
543        };
544
545        // Phase 5 PG parity: substitute any registered view name that
546        // appears in the expression with its stored body. Runs after
547        // parse and before dispatch so the SQL entrypoint gets the
548        // same view resolution `execute_query_expr` already does.
549        let expr = runtime.rewrite_view_refs(expr);
550
551        Ok(PreparedStatement { expr, mode })
552    }
553
554    pub(super) fn check_query_privilege(
555        &self,
556        runtime: &RedDBRuntime,
557        expr: &QueryExpr,
558    ) -> RedDBResult<()> {
559        // Frame-level coarse gate. We consult `required_privilege()`
560        // (computed once at frame-build) against the captured identity
561        // before the deep grant engine walks the parsed expression.
562        // The coarse gate cannot ALLOW anything the grant engine would
563        // deny — it only short-circuits the obvious "Role::Read tries
564        // INSERT" case so a downstream caller never has to redo this
565        // check inline. `Privilege::None` (transaction control / SET /
566        // SHOW) flows through unchanged; the grant engine treats those
567        // as bypass too.
568        if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
569            let needed = <Self as ReadFrame>::required_privilege(self);
570            if !needed.is_satisfied_by(role) {
571                // Issue #205 — when the deep grant engine *also*
572                // denies, we treat this as an ordinary permission
573                // failure. But when an Admin-only statement reaches
574                // this gate without an auth_store wired (so the deep
575                // engine can't double-check), the coarse rejection is
576                // the only line of defence — emit an OperatorEvent so
577                // the operator notices an Admin-class statement was
578                // attempted with insufficient role.
579                if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
580                    crate::telemetry::operator_event::OperatorEvent::AuthBypass {
581                        principal: username.to_string(),
582                        resource: format!("statement requiring {needed:?}"),
583                        detail: format!(
584                            "auth_store not wired; coarse gate is sole defence (role={role:?})"
585                        ),
586                    }
587                    .emit_global();
588                }
589                return Err(RedDBError::Query(format!(
590                    "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
591                )));
592            }
593        }
594        runtime
595            .check_query_privilege(expr)
596            .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
597    }
598
599    pub(super) fn prepare_dispatch(
600        &self,
601        runtime: &RedDBRuntime,
602        expr: &QueryExpr,
603    ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
604        runtime.validate_model_operations_before_auth(expr)?;
605        self.check_query_privilege(runtime, expr)?;
606        Ok(self.acquire_intent_locks(runtime, expr))
607    }
608
609    pub(super) fn acquire_intent_locks(
610        &self,
611        runtime: &RedDBRuntime,
612        expr: &QueryExpr,
613    ) -> Option<crate::runtime::locking::LockerGuard> {
614        if !runtime.config_bool("concurrency.locking.enabled", true) {
615            return None;
616        }
617        // Frame-level short-circuit: if the statement carries no lock
618        // intent (transaction control, SET, SHOW), skip the lock
619        // manager entirely instead of letting `intent_lock_modes_for`
620        // walk the parsed expression to reach the same conclusion.
621        if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
622            return None;
623        }
624        intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
625            let mut guard =
626                crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
627            let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
628            for collection in collections_referenced(expr) {
629                let _ = guard.acquire(
630                    crate::runtime::locking::Resource::Collection(collection),
631                    coll_mode,
632                );
633            }
634            guard
635        })
636    }
637}
638
639impl ReadFrame for StatementExecutionFrame {
640    fn effective_scope(&self) -> Option<&str> {
641        self.effective_scope.as_deref()
642    }
643
644    fn identity(&self) -> Option<(&str, Role)> {
645        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
646    }
647
648    fn snapshot(&self) -> &Snapshot {
649        &self.snapshot
650    }
651
652    fn as_of_floor(&self) -> Option<Xid> {
653        self.as_of_floor
654    }
655
656    fn cache_key(&self) -> &str {
657        &self.cache_key
658    }
659
660    fn should_cache_result(&self) -> bool {
661        !self.is_volatile_query && self.cache_safe
662    }
663
664    fn required_privilege(&self) -> Privilege {
665        self.required_privilege
666    }
667
668    fn lock_intent(&self) -> LockIntent {
669        self.lock_intent
670    }
671
672    fn visible_collections(&self) -> Option<&HashSet<String>> {
673        self.visible_collections.as_ref()
674    }
675}
676
677/// Lightweight `ReadFrame` carrier used by AI command entry points
678/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
679///
680/// Issue #119 calls this struct `EffectiveScope`. It bundles the
681/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
682/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
683/// instead of re-reading thread-locals at every call site.
684///
685/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
686/// from the per-statement thread-locals (identical to how
687/// `StatementExecutionFrame::build` derives them) and resolves
688/// `visible_collections` via the `AuthStore` cache.
689pub struct EffectiveScope {
690    pub(crate) tenant: Option<String>,
691    pub(crate) identity: Option<(String, Role)>,
692    pub(crate) snapshot: Snapshot,
693    pub(crate) visible_collections: Option<HashSet<String>>,
694}
695
696impl EffectiveScope {
697    /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
698    /// to gate LLM-backed NER calls behind `ai:ner:read`.
699    ///
700    /// Placeholder for now: always returns `false`. The auth engine's
701    /// capability matrix is future work; until it lands, every routed
702    /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
703    /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
704    /// Documented in code so the wire-up is a one-line change once
705    /// the auth engine learns capabilities.
706    pub fn has_capability(&self, _capability: &str) -> bool {
707        false
708    }
709}
710
711impl ReadFrame for EffectiveScope {
712    fn effective_scope(&self) -> Option<&str> {
713        self.tenant.as_deref()
714    }
715    fn identity(&self) -> Option<(&str, Role)> {
716        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
717    }
718    fn snapshot(&self) -> &Snapshot {
719        &self.snapshot
720    }
721    fn as_of_floor(&self) -> Option<Xid> {
722        None
723    }
724    fn cache_key(&self) -> &str {
725        ""
726    }
727    fn should_cache_result(&self) -> bool {
728        false
729    }
730    fn required_privilege(&self) -> Privilege {
731        Privilege::Read
732    }
733    fn lock_intent(&self) -> LockIntent {
734        LockIntent::Shared
735    }
736    fn visible_collections(&self) -> Option<&HashSet<String>> {
737        self.visible_collections.as_ref()
738    }
739}
740
741impl RedDBRuntime {
742    /// Build the AI command `EffectiveScope` from the current
743    /// statement thread-locals + auth store.
744    ///
745    /// Returns `None` for embedded callers (no auth store, no
746    /// identity) — `AuthorizedSearch` treats `None` as deny-default.
747    pub(crate) fn ai_scope(&self) -> EffectiveScope {
748        let tenant = super::impl_core::current_tenant();
749        let identity = super::impl_core::current_auth_identity();
750        let snapshot = self.current_snapshot();
751        let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
752            (Some(store), Some((principal, role))) => {
753                let collections = self.inner.db.store().list_collections();
754                Some(store.visible_collections_for_scope(
755                    tenant.as_deref(),
756                    *role,
757                    principal,
758                    &collections,
759                ))
760            }
761            _ => None,
762        };
763        EffectiveScope {
764            tenant,
765            identity,
766            snapshot,
767            visible_collections,
768        }
769    }
770}
771
772/// Test fixtures for callers that need to drive `ReadFrame` without
773/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
774/// only leaks across module boundaries inside the crate.
775#[cfg(test)]
776pub(crate) mod test_support {
777    use super::{LockIntent, Privilege, ReadFrame};
778    use crate::auth::Role;
779    use crate::storage::transaction::snapshot::{Snapshot, Xid};
780    use std::collections::HashSet;
781
782    /// A `ReadFrame` impl with hand-set fields. Used by
783    /// `authorized_search` tests to assert the deny-default and
784    /// scope-trim behaviour without going through frame construction.
785    pub(crate) struct FakeReadFrame {
786        pub tenant: Option<String>,
787        pub identity: Option<(String, Role)>,
788        pub snapshot: Snapshot,
789        pub visible: Option<HashSet<String>>,
790    }
791
792    impl FakeReadFrame {
793        pub(crate) fn without_scope() -> Self {
794            Self {
795                tenant: None,
796                identity: None,
797                snapshot: Snapshot {
798                    xid: 0,
799                    in_progress: HashSet::new(),
800                },
801                visible: None,
802            }
803        }
804
805        pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
806            Self {
807                tenant: Some("acme".to_string()),
808                identity: Some(("alice".to_string(), Role::Read)),
809                snapshot: Snapshot {
810                    xid: 0,
811                    in_progress: HashSet::new(),
812                },
813                visible: Some(visible),
814            }
815        }
816    }
817
818    impl ReadFrame for FakeReadFrame {
819        fn effective_scope(&self) -> Option<&str> {
820            self.tenant.as_deref()
821        }
822        fn identity(&self) -> Option<(&str, Role)> {
823            self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
824        }
825        fn snapshot(&self) -> &Snapshot {
826            &self.snapshot
827        }
828        fn as_of_floor(&self) -> Option<Xid> {
829            None
830        }
831        fn cache_key(&self) -> &str {
832            ""
833        }
834        fn should_cache_result(&self) -> bool {
835            false
836        }
837        fn required_privilege(&self) -> Privilege {
838            Privilege::Read
839        }
840        fn lock_intent(&self) -> LockIntent {
841            LockIntent::Shared
842        }
843        fn visible_collections(&self) -> Option<&HashSet<String>> {
844            self.visible.as_ref()
845        }
846    }
847}
848
849impl RedDBRuntime {
850    fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
851        let mut set = HashSet::new();
852        if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
853            set.insert(ctx.xid);
854            for (_, sub) in &ctx.savepoints {
855                set.insert(*sub);
856            }
857            for sub in &ctx.released_sub_xids {
858                set.insert(*sub);
859            }
860        }
861        set
862    }
863
864    /// Resolve the snapshot for the current statement, returning
865    /// the snapshot itself and (when AS OF is in effect) the
866    /// resolved xid floor. The floor is the same xid carried inside
867    /// `Snapshot.xid` for AS OF reads — exposing it separately lets
868    /// the `ReadFrame` Interface tell "live read" from "historical
869    /// read" without inferring from `in_progress.is_empty()`.
870    fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
871        match peek_top_level_as_of_with_table(query) {
872            Some((spec, Some(table))) => {
873                if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
874                    return Err(RedDBError::InvalidConfig(format!(
875                        "AS OF requires a versioned collection — \
876                         `{table}` has not opted in. \
877                         Call vcs.set_versioned(\"{table}\", true) first."
878                    )));
879                }
880                let xid = self.vcs_resolve_as_of(spec)?;
881                Ok((
882                    Snapshot {
883                        xid,
884                        in_progress: HashSet::new(),
885                    },
886                    Some(xid),
887                ))
888            }
889            Some((spec, None)) => {
890                let xid = self.vcs_resolve_as_of(spec)?;
891                Ok((
892                    Snapshot {
893                        xid,
894                        in_progress: HashSet::new(),
895                    },
896                    Some(xid),
897                ))
898            }
899            None => Ok((self.current_snapshot(), None)),
900        }
901    }
902
903    fn result_cache_safe(&self, conn_id: u64) -> bool {
904        let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
905        let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
906        !has_active_xids && !in_own_tx
907    }
908}
909
910fn result_cache_key(query: &str) -> String {
911    let tenant = current_tenant().unwrap_or_default();
912    let auth = current_auth_identity()
913        .map(|(user, role)| format!("{}|{:?}", user, role))
914        .unwrap_or_default();
915    if tenant.is_empty() && auth.is_empty() {
916        query.to_string()
917    } else {
918        format!("{query}\u{001e}{tenant}\u{001e}{auth}")
919    }
920}
921
922#[cfg(test)]
923mod tests {
924    use super::*;
925    use crate::api::RedDBOptions;
926    use crate::runtime::impl_core::{
927        clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
928        set_current_tenant,
929    };
930    use crate::runtime::RedDBRuntime;
931
932    fn fresh_runtime() -> RedDBRuntime {
933        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
934    }
935
936    /// Ensure thread-local state from a prior test can't leak into
937    /// the next one — tests in the same binary share the thread.
938    fn reset_thread_locals() {
939        clear_current_tenant();
940        clear_current_auth_identity();
941    }
942
943    #[test]
944    fn autocommit_select_takes_live_snapshot() {
945        reset_thread_locals();
946        let rt = fresh_runtime();
947        let frame =
948            StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
949
950        // Live reads: no AS OF floor, snapshot bounded by the
951        // manager's `peek_next_xid` so committed tuples are visible.
952        let f: &dyn ReadFrame = &frame;
953        assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
954        assert!(
955            f.snapshot().xid >= 1,
956            "autocommit snapshot xid is bounded by peek_next_xid"
957        );
958    }
959
960    #[test]
961    fn frame_captures_identity_and_scope() {
962        reset_thread_locals();
963        set_current_tenant("acme".to_string());
964        set_current_auth_identity("alice".to_string(), Role::Write);
965
966        let rt = fresh_runtime();
967        let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
968        let f: &dyn ReadFrame = &frame;
969
970        assert_eq!(f.effective_scope(), Some("acme"));
971        let id = f.identity().expect("identity captured");
972        assert_eq!(id.0, "alice");
973        assert!(matches!(id.1, Role::Write));
974
975        // Cache key mixes scope + identity so two callers under
976        // different tenants never share a cache slot.
977        assert!(
978            f.cache_key().contains("acme") && f.cache_key().contains("alice"),
979            "cache key folds in scope + identity, got {:?}",
980            f.cache_key()
981        );
982
983        reset_thread_locals();
984    }
985
986    #[test]
987    fn as_of_rejects_non_versioned_user_collection() {
988        reset_thread_locals();
989        let rt = fresh_runtime();
990
991        // `not_versioned` is a plain user collection — the frame
992        // builder must reject AS OF until the caller opts in via
993        // `vcs.set_versioned`.
994        let err = match StatementExecutionFrame::build(
995            &rt,
996            "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
997        ) {
998            Err(e) => e,
999            Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1000        };
1001
1002        let msg = format!("{err}");
1003        assert!(
1004            msg.contains("AS OF requires a versioned collection"),
1005            "expected AS OF rejection, got: {msg}"
1006        );
1007    }
1008
1009    /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1010    ///
1011    /// Sets a tenant + identity via the public thread-local API the
1012    /// runtime uses for ambient scope, drives a real `SELECT` through
1013    /// `execute_query`, then inspects the result cache that the SELECT
1014    /// path populates via `frame.cache_key()`. The key only carries
1015    /// the tenant + identity *because* it was built through the frame —
1016    /// reverting the wiring to inline `current_tenant()` /
1017    /// `current_auth_identity()` reads would still pass this test, but
1018    /// dropping the frame entirely (so the SELECT path stopped touching
1019    /// `cache_key`) would break it.
1020    #[test]
1021    fn select_path_routes_through_frame_cache_key() {
1022        reset_thread_locals();
1023        set_current_tenant("acme".to_string());
1024        set_current_auth_identity("alice".to_string(), Role::Read);
1025
1026        let rt = fresh_runtime();
1027        let result = rt
1028            .execute_query("SELECT 1")
1029            .expect("SELECT 1 executes under tenant=acme/identity=alice");
1030        assert_eq!(result.statement_type, "select");
1031
1032        // The SELECT path (in `execute_query_expr`) builds a frame and
1033        // writes its result through `frame.cache_key()`. That key folds
1034        // tenant + identity in via `result_cache_key`, so finding "acme"
1035        // and "alice" inside any cached key proves the frame was the
1036        // seam used.
1037        let cache = rt.inner.result_cache.read();
1038        let any_keyed_with_scope = cache
1039            .0
1040            .keys()
1041            .any(|k| k.contains("acme") && k.contains("alice"));
1042        assert!(
1043            any_keyed_with_scope,
1044            "expected at least one result-cache key carrying tenant+identity, \
1045             got keys: {:?}",
1046            cache.0.keys().collect::<Vec<_>>()
1047        );
1048
1049        reset_thread_locals();
1050    }
1051
1052    /// A SELECT that calls a volatile builtin (here:
1053    /// `pg_advisory_unlock`, the volatile token the runtime currently
1054    /// recognises in `query_has_volatile_builtin`) must NOT populate
1055    /// the result cache. Any caller hitting the cache after this would
1056    /// see a stale answer for an inherently-volatile query, so the
1057    /// SELECT path gates writes through `frame.should_cache_result()`.
1058    ///
1059    /// Deletion test: removing `ReadFrame::should_cache_result`, or
1060    /// reverting the SELECT path to skip its safety gate, would let
1061    /// the result cache silently absorb this statement and break the
1062    /// assertion below.
1063    #[test]
1064    fn volatile_select_does_not_populate_result_cache() {
1065        reset_thread_locals();
1066        let rt = fresh_runtime();
1067
1068        // Frame-level invariant: the volatile-builtin signal collapses
1069        // `should_cache_result` to false even for an autocommit /
1070        // out-of-tx connection.
1071        let frame =
1072            StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1073        let f: &dyn ReadFrame = &frame;
1074        assert!(
1075            !f.should_cache_result(),
1076            "volatile builtin must disable result-cache safety"
1077        );
1078
1079        // End-to-end: drive the volatile SELECT through `execute_query`
1080        // and confirm no entry was stamped under its cache key. Other
1081        // entries from prior tests sharing the binary may exist, so we
1082        // assert specifically on this query's key.
1083        let _ = rt
1084            .execute_query("SELECT pg_advisory_unlock(1)")
1085            .expect("volatile SELECT executes");
1086        let cache = rt.inner.result_cache.read();
1087        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1088        assert!(
1089            !cache.0.contains_key(&key),
1090            "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1091            cache.0.keys().collect::<Vec<_>>()
1092        );
1093
1094        reset_thread_locals();
1095    }
1096
1097    #[test]
1098    fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1099        reset_thread_locals();
1100        let rt = fresh_runtime();
1101        rt.inner
1102            .db
1103            .store()
1104            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1105
1106        let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1107        assert_eq!(result.statement_type, "select");
1108
1109        let key = result_cache_key("SELECT 1");
1110        assert!(
1111            rt.inner
1112                .result_blob_cache
1113                .get("runtime.result_cache", &key)
1114                .is_some(),
1115            "blob backend should stamp the Blob Cache path"
1116        );
1117        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1118        assert!(
1119            !rt.inner.result_cache.read().0.contains_key(&key),
1120            "blob backend should not write the legacy map"
1121        );
1122    }
1123
1124    #[test]
1125    fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1126        reset_thread_locals();
1127        let rt = fresh_runtime();
1128        rt.inner
1129            .db
1130            .store()
1131            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1132
1133        let _ = rt
1134            .execute_query("SELECT pg_advisory_unlock(1)")
1135            .expect("volatile SELECT executes");
1136        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1137        assert!(
1138            rt.inner
1139                .result_blob_cache
1140                .get("runtime.result_cache", &key)
1141                .is_none(),
1142            "volatile SELECT must not populate blob result cache"
1143        );
1144        assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1145    }
1146
1147    #[test]
1148    fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1149        reset_thread_locals();
1150        let rt = fresh_runtime();
1151        rt.inner
1152            .db
1153            .store()
1154            .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1155
1156        let first = rt.execute_query("SELECT 1").expect("first SELECT");
1157        let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1158        assert_eq!(first.result.len(), second.result.len());
1159
1160        let key = result_cache_key("SELECT 1");
1161        assert!(rt.inner.result_cache.read().0.contains_key(&key));
1162        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1163        assert_eq!(rt.result_cache_shadow_divergences(), 0);
1164        assert_eq!(
1165            crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1166            "cache_shadow_divergence_total"
1167        );
1168    }
1169
1170    #[test]
1171    fn as_of_on_red_collection_records_floor() {
1172        reset_thread_locals();
1173        let rt = fresh_runtime();
1174
1175        // `red_*` collections always allow AS OF. The frame should
1176        // resolve to a concrete xid and surface it via the Interface.
1177        let frame =
1178            StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1179                .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1180
1181        let f: &dyn ReadFrame = &frame;
1182        assert_eq!(
1183            f.as_of_floor(),
1184            Some(1),
1185            "AS OF SNAPSHOT 1 records xid=1 as the floor"
1186        );
1187        assert_eq!(f.snapshot().xid, 1);
1188        assert!(
1189            f.snapshot().in_progress.is_empty(),
1190            "historical reads have no in-progress set"
1191        );
1192    }
1193
1194    /// The frame classifies common SQL prefixes into the coarse
1195    /// `Privilege` / `LockIntent` buckets at build time. This test
1196    /// pins the mapping so a regression that silently re-routes
1197    /// (e.g. INSERT classified as Read) surfaces here, not at a
1198    /// downstream privilege gate.
1199    #[test]
1200    fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1201        reset_thread_locals();
1202        let rt = fresh_runtime();
1203
1204        let cases = [
1205            ("SELECT 1", Privilege::Read, LockIntent::Shared),
1206            (
1207                "INSERT INTO t (id) VALUES (1)",
1208                Privilege::Write,
1209                LockIntent::Exclusive,
1210            ),
1211            (
1212                "UPDATE t SET x = 1 WHERE id = 1",
1213                Privilege::Write,
1214                LockIntent::Exclusive,
1215            ),
1216            (
1217                "DELETE FROM t WHERE id = 1",
1218                Privilege::Write,
1219                LockIntent::Exclusive,
1220            ),
1221            (
1222                "CREATE TABLE foo (id INT)",
1223                Privilege::Write,
1224                LockIntent::Exclusive,
1225            ),
1226            ("BEGIN", Privilege::None, LockIntent::None),
1227            ("COMMIT", Privilege::None, LockIntent::None),
1228            ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1229        ];
1230
1231        for (q, want_priv, want_lock) in cases {
1232            let frame = StatementExecutionFrame::build(&rt, q)
1233                .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1234            let f: &dyn ReadFrame = &frame;
1235            assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1236            assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1237        }
1238    }
1239
1240    /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1241    /// driven through `execute_query` under an identity whose role
1242    /// doesn't satisfy the frame's coarse `Read` privilege gets
1243    /// denied with the frame's signal.
1244    ///
1245    /// We test the gate by classifying an INSERT (which the frame
1246    /// reports as `Privilege::Write`) under `Role::Read` — the only
1247    /// pair the legacy fallback would also reject, but here the
1248    /// rejection comes through `frame.check_query_privilege` BEFORE
1249    /// the parsed-expression walker runs. Removing
1250    /// `required_privilege` (or the `is_satisfied_by` consult inside
1251    /// `check_query_privilege`) would force the deny path back to the
1252    /// inline `RedDBRuntime::check_query_privilege` walker — but the
1253    /// auth_store gate up there is bypassed when no auth_store is
1254    /// wired (embedded test mode), so this test would FLIP from
1255    /// denied to permitted and break the assertion below.
1256    #[test]
1257    fn insert_under_read_role_denied_via_frame_privilege() {
1258        reset_thread_locals();
1259        set_current_auth_identity("alice".to_string(), Role::Read);
1260
1261        let rt = fresh_runtime();
1262        // Bypass parser by reaching into the frame directly: the
1263        // frame derives privilege from the SQL prefix without
1264        // needing an auth_store wired up. Driving end-to-end via
1265        // `execute_query` would also reject (no table `t`), but for
1266        // a different reason — we want to pin the privilege seam.
1267        let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1268            .expect("frame builds for INSERT");
1269        let f: &dyn ReadFrame = &frame;
1270        assert_eq!(
1271            f.required_privilege(),
1272            Privilege::Write,
1273            "INSERT classified as Write"
1274        );
1275        let id = f.identity().expect("identity captured");
1276        assert!(
1277            !f.required_privilege().is_satisfied_by(id.1),
1278            "Role::Read does not satisfy Privilege::Write — frame must deny"
1279        );
1280
1281        // End-to-end: the frame's `check_query_privilege` sees the
1282        // (Read role, Write privilege) mismatch and denies before
1283        // dispatch. We drive a synthetic `QueryExpr::Table` because
1284        // the SELECT/INSERT parser would happen to also fail, and we
1285        // want the failure to come from the privilege seam.
1286        use crate::storage::query::ast::{QueryExpr, TableQuery};
1287        let expr = QueryExpr::Table(TableQuery::new("t"));
1288        let err = frame
1289            .check_query_privilege(&rt, &expr)
1290            .expect_err("denied via frame's coarse privilege gate");
1291        let msg = format!("{err}");
1292        assert!(
1293            msg.contains("permission denied") && msg.contains("Write"),
1294            "expected frame-level Write deny, got: {msg}"
1295        );
1296
1297        reset_thread_locals();
1298    }
1299
1300    /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1301    /// control statement carries `LockIntent::None` and the
1302    /// `acquire_intent_locks` path returns `None` without consulting
1303    /// `intent_lock_modes_for`. Removing the method (or its consult
1304    /// site in `acquire_intent_locks`) would force the lock-mode
1305    /// helper to walk a fabricated parsed expression to reach the
1306    /// same conclusion — but the assertion that no guard is allocated
1307    /// for a `BEGIN` frame would still hold, so we additionally pin
1308    /// the classifier mapping above to make the deletion observable.
1309    #[test]
1310    fn control_statement_skips_intent_locks_via_frame() {
1311        reset_thread_locals();
1312        let rt = fresh_runtime();
1313
1314        let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1315        let f: &dyn ReadFrame = &frame;
1316        assert_eq!(f.lock_intent(), LockIntent::None);
1317
1318        // Drive `acquire_intent_locks` against a fabricated SELECT
1319        // expression that WOULD normally yield `(IS, IS)`; the frame's
1320        // `lock_intent() == None` short-circuit must still suppress
1321        // the guard.
1322        use crate::storage::query::ast::{QueryExpr, TableQuery};
1323        let expr = QueryExpr::Table(TableQuery::new("t"));
1324        let guard = frame.acquire_intent_locks(&rt, &expr);
1325        assert!(
1326            guard.is_none(),
1327            "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1328        );
1329    }
1330}