Skip to main content

reddb_server/runtime/
statement_frame.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use super::impl_core::{
5    collections_referenced, current_auth_identity, current_connection_id, current_tenant,
6    has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
7    query_has_volatile_builtin, ConfigSnapshotGuard, CurrentSnapshotGuard, SecretStoreGuard,
8    SnapshotContext, TxLocalTenantGuard,
9};
10use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
11use crate::api::{RedDBError, RedDBResult};
12use crate::auth::Role;
13use crate::storage::query::ast::QueryExpr;
14use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
15use crate::storage::transaction::snapshot::{Snapshot, Xid};
16
17/// Coarse privilege classification for a statement, computed once at
18/// frame-build time from the SQL text. Mirrors the three-role auth
19/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
20/// answer "can this identity run this statement?" without re-walking
21/// the parsed `QueryExpr` at every call site.
22///
23/// `None` means the statement does not touch the privilege gate at
24/// all (transaction control, SET, SHOW). Such statements must remain
25/// runnable under any authenticated identity.
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub(crate) enum Privilege {
28    /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
29    /// any role from `Role::Read` upward.
30    Read,
31    /// Mutation of user data or schema author DDL (INSERT, UPDATE,
32    /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
33    /// at least `Role::Write`.
34    Write,
35    /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
36    /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
37    Admin,
38    /// Statement does not consult the privilege gate (BEGIN, COMMIT,
39    /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
40    /// for any authenticated identity.
41    None,
42}
43
44impl Privilege {
45    /// `true` iff `role` is sufficient to execute a statement carrying
46    /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
47    /// Admin` containment used by the auth fallback path.
48    pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
49        match self {
50            Self::None => true,
51            Self::Read => role.can_read(),
52            Self::Write => role.can_write(),
53            Self::Admin => role.can_admin(),
54        }
55    }
56}
57
58/// Coarse lock intent for a statement, computed once at frame-build
59/// time. Maps onto the storage-layer's `LockMode` matrix downstream
60/// but stays decoupled here so the runtime can answer "does this
61/// statement need the lock manager at all?" without a `use storage::`
62/// at every call site.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub(crate) enum LockIntent {
65    /// No collection-level lock needed (transaction control, SET,
66    /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
67    None,
68    /// Reader-style intent: SELECT, joins, graph / queue / search
69    /// reads. Maps to `(IS, IS)` at the storage layer.
70    Shared,
71    /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
72    /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
73    /// `Exclusive` at this granularity — call sites that need the
74    /// finer distinction still consult `intent_lock_modes_for`.
75    Exclusive,
76}
77
78/// Small, stable Interface that *represents* a read statement's
79/// execution context. Every read caller that needs to know "under
80/// what scope / identity / snapshot am I running, and is there an
81/// AS OF floor in effect?" consults this trait — never the
82/// underlying thread-locals or runtime fields directly.
83///
84/// The deletion test: removing this trait would force the four
85/// concerns it exposes back into ad-hoc lookups at every read
86/// callsite (`current_tenant()`, `current_auth_identity()`,
87/// `capture_current_snapshot()`, AS OF re-parsing). The trait
88/// concentrates them in one place so future changes (per-statement
89/// logging, audit, scope policy) have a single seam to extend.
90pub(crate) trait ReadFrame {
91    /// Effective tenant scope for the statement after WITHIN /
92    /// SET LOCAL TENANT / SET TENANT resolution. `None` means
93    /// "no tenant bound" (RLS deny-default applies).
94    fn effective_scope(&self) -> Option<&str>;
95
96    /// Authenticated identity observed at frame-build time, if any.
97    /// Returns `(username, role)` so callers can render audit lines
98    /// or feed RLS policy lookups without re-reading thread-locals.
99    fn identity(&self) -> Option<(&str, Role)>;
100
101    /// MVCC snapshot the statement reads against. For autocommit
102    /// this is a fresh snapshot; inside an active transaction it
103    /// is the txn's snapshot; under AS OF it is the resolved
104    /// historical xid.
105    fn snapshot(&self) -> &Snapshot;
106
107    /// AS OF xid floor when AS OF was applied for this statement,
108    /// `None` for live reads. Useful for downstream callers that
109    /// want to gate behaviour on historical-read mode without
110    /// re-parsing the query.
111    fn as_of_floor(&self) -> Option<Xid>;
112
113    /// Stable result-cache key for the statement (already mixes
114    /// effective tenant + identity).
115    fn cache_key(&self) -> &str;
116
117    /// Whether the statement is safe to serve from / populate the
118    /// result cache. Combines two underlying signals:
119    ///
120    ///   * the query does not call a volatile builtin (e.g. `NOW()`,
121    ///     `RANDOM()`, `UUID()`), which would change between calls,
122    ///   * the connection is not inside an active transaction with
123    ///     uncommitted writes that other readers shouldn't observe.
124    ///
125    /// SELECT cache callsites (read + write) consult this method
126    /// instead of re-deriving safety from globals or poking the
127    /// frame's private fields. Removing it would force every cache
128    /// callsite to re-run `query_has_volatile_builtin` plus
129    /// `result_cache_safe(conn_id)` inline.
130    fn should_cache_result(&self) -> bool;
131
132    /// Coarse privilege class the statement requires, computed once
133    /// at frame-build time from the SQL prefix. Read/write dispatch
134    /// sites consult this instead of re-classifying the parsed
135    /// `QueryExpr` inline at every callsite.
136    ///
137    /// Removing this method would force every privilege gate to
138    /// recompute the (action, resource) classification from the
139    /// parsed expression and re-check the role hierarchy inline.
140    fn required_privilege(&self) -> Privilege;
141
142    /// Coarse collection-level lock intent the statement implies.
143    /// `None` lets the lock-acquisition path short-circuit without
144    /// touching the lock manager.
145    ///
146    /// Removing this method would force the lock-acquisition path
147    /// to always invoke `intent_lock_modes_for` (which itself walks
148    /// the parsed expression) even for transaction-control / SET /
149    /// SHOW statements that need no collection lock at all.
150    fn lock_intent(&self) -> LockIntent;
151
152    /// Set of collection ids the calling identity is allowed to
153    /// observe under the active `(tenant, role)` scope. Computed once
154    /// at frame-build time via the `AuthStore` visible-collections
155    /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
156    /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
157    /// before any similarity score is computed (issue #119).
158    ///
159    /// `None` means the frame was built without an auth store wired —
160    /// embedded / single-tenant tests run that way. AI search call
161    /// sites refuse to proceed with `None`, which is the deny-default
162    /// the issue requires; pure SELECT paths fall back to the existing
163    /// per-row RLS gate.
164    fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
165}
166
167/// Cheap first-word classification of a SQL statement, used at
168/// frame-build time to derive `Privilege` + `LockIntent` without
169/// re-parsing the query. Matches the keywords that the legacy
170/// inline checks in `RedDBRuntime::check_query_privilege` and
171/// `intent_lock_modes_for` already key on.
172fn statement_kind(query: &str) -> &'static str {
173    let trimmed = query.trim_start();
174    // Skip a leading line / block comment so the classifier doesn't
175    // misread `/* ... */ SELECT ...` as an unknown statement.
176    let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
177        rest.split_once('\n')
178            .map(|(_, r)| r)
179            .unwrap_or("")
180            .trim_start()
181    } else {
182        trimmed
183    };
184    let first = trimmed
185        .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
186        .next()
187        .unwrap_or("");
188    // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
189    let mut buf = [0u8; 16];
190    let bytes = first.as_bytes();
191    let n = bytes.len().min(buf.len());
192    for i in 0..n {
193        buf[i] = bytes[i].to_ascii_uppercase();
194    }
195    match &buf[..n] {
196        b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" => "read",
197        b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
198        b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
199        b"GRANT" | b"REVOKE" => "admin",
200        b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
201        | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
202        _ => "unknown",
203    }
204}
205
206fn classify_privilege(query: &str) -> Privilege {
207    match statement_kind(query) {
208        "read" => Privilege::Read,
209        "write" => Privilege::Write,
210        // DDL is gated at `Role::Write` in the legacy fallback (see
211        // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
212        // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
213        // GRANT / REVOKE upgrade to Admin via finer checks at the call
214        // site — the frame surfaces only the coarse class.
215        "ddl" => Privilege::Write,
216        "admin" => Privilege::Admin,
217        _ => Privilege::None,
218    }
219}
220
221fn classify_lock_intent(query: &str) -> LockIntent {
222    match statement_kind(query) {
223        "read" => LockIntent::Shared,
224        "write" | "ddl" => LockIntent::Exclusive,
225        _ => LockIntent::None,
226    }
227}
228
229pub(super) struct StatementExecutionFrame {
230    tx_local_tenant: Option<Option<String>>,
231    snapshot: Snapshot,
232    own_xids: HashSet<Xid>,
233    cache_key: String,
234    is_volatile_query: bool,
235    cache_safe: bool,
236    /// Effective tenant captured at frame-build time after WITHIN /
237    /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
238    /// so the `ReadFrame` Interface can return a borrow without
239    /// re-touching the thread-local stack.
240    effective_scope: Option<String>,
241    /// Auth identity captured at frame-build time. `None` for
242    /// embedded / anonymous callers.
243    identity: Option<(String, Role)>,
244    /// `Some(xid)` when AS OF resolved to a historical xid; `None`
245    /// for live reads.
246    as_of_floor: Option<Xid>,
247    /// Privilege class required by the statement, derived from the
248    /// SQL text at frame-build time. Read/write dispatch sites
249    /// consult this instead of re-classifying the parsed expression.
250    required_privilege: Privilege,
251    /// Collection-level lock intent the statement implies. The
252    /// lock-acquisition path short-circuits when this is `None`.
253    lock_intent: LockIntent,
254    /// Set of collection ids the active `(tenant, role)` scope is
255    /// allowed to observe. Computed at frame-build time via the
256    /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
257    /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
258    /// scoring (issue #119). `None` when no auth store is wired
259    /// (embedded test mode) — AI search refuses on `None`.
260    visible_collections: Option<HashSet<String>>,
261}
262
263pub(super) struct StatementFrameGuards {
264    _tx_local_guard: TxLocalTenantGuard,
265    _config_snapshot_guard: ConfigSnapshotGuard,
266    _secret_store_guard: SecretStoreGuard,
267    _snapshot_guard: CurrentSnapshotGuard,
268}
269
270pub(super) struct PreparedStatement {
271    pub(super) expr: QueryExpr,
272    pub(super) mode: QueryMode,
273}
274
275impl StatementExecutionFrame {
276    pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
277        let conn_id = current_connection_id();
278        let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
279        let own_xids = runtime.own_transaction_xids(conn_id);
280        let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
281        let cache_key = result_cache_key(query);
282        let is_volatile_query = query_has_volatile_builtin(query);
283        let cache_safe = runtime.result_cache_safe(conn_id);
284        // Capture identity + effective scope under the same
285        // thread-local view that the cache key was built from, so
286        // the Interface and the cache key agree on what "this
287        // statement" means.
288        let effective_scope = current_tenant();
289        let identity = current_auth_identity();
290
291        // Coarse classification of the statement, computed once from
292        // the SQL prefix so downstream callers don't re-derive it
293        // from the parsed `QueryExpr` at every privilege / lock site.
294        let required_privilege = classify_privilege(query);
295        let lock_intent = classify_lock_intent(query);
296
297        // Issue #119: resolve the visible-collections set for the
298        // active (tenant, role) scope. Only meaningful when an auth
299        // store is wired *and* an identity was captured — embedded
300        // anonymous callers fall back to `None`, and AI search call
301        // sites refuse on `None`.
302        let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
303        {
304            (Some(store), Some((principal, role))) => {
305                let collections = runtime.inner.db.store().list_collections();
306                Some(store.visible_collections_for_scope(
307                    effective_scope.as_deref(),
308                    *role,
309                    principal,
310                    &collections,
311                ))
312            }
313            _ => None,
314        };
315
316        Ok(Self {
317            tx_local_tenant,
318            snapshot,
319            own_xids,
320            cache_key,
321            is_volatile_query,
322            cache_safe,
323            effective_scope,
324            identity,
325            as_of_floor,
326            required_privilege,
327            lock_intent,
328            visible_collections,
329        })
330    }
331
332    pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
333        StatementFrameGuards {
334            _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
335            _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
336            _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
337            _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
338                snapshot: self.snapshot.clone(),
339                manager: Arc::clone(&runtime.inner.snapshot_manager),
340                own_xids: self.own_xids.clone(),
341            }),
342        }
343    }
344
345    pub(super) fn cache_key(&self) -> &str {
346        &self.cache_key
347    }
348
349    pub(super) fn can_read_result_cache(&self) -> bool {
350        // Delegates to the `ReadFrame` Interface so the volatile +
351        // active-tx safety decision lives in exactly one place.
352        <Self as ReadFrame>::should_cache_result(self)
353    }
354
355    pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
356        // Cache-safety (volatile builtin, active-tx writes) comes from
357        // the Interface; the rest are write-side payload heuristics
358        // (statement shape, result size) that aren't part of the
359        // safety contract.
360        <Self as ReadFrame>::should_cache_result(self)
361            && result.statement_type == "select"
362            && result.engine != "vault"
363            && result.result.pre_serialized_json.is_none()
364            && result.result.records.len() <= 5
365    }
366
367    pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
368        if self.can_read_result_cache() {
369            runtime.get_result_cache_entry(self.cache_key())
370        } else {
371            None
372        }
373    }
374
375    pub(super) fn write_result_cache(
376        &self,
377        runtime: &RedDBRuntime,
378        result: &RuntimeQueryResult,
379        scopes: HashSet<String>,
380    ) {
381        if self.should_write_result_cache(result) {
382            runtime.put_result_cache_entry(
383                self.cache_key(),
384                RuntimeResultCacheEntry {
385                    result: result.clone(),
386                    cached_at: std::time::Instant::now(),
387                    scopes,
388                },
389            );
390        }
391    }
392
393    pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
394        // Detected via cheap prefix check so non-CTE queries skip the
395        // full parse here. CTE-bearing queries bypass the plan cache
396        // and result cache (rare workload — perf optimization is a
397        // follow-up). Inlining substitutes every CTE reference with
398        // its body as a subquery in FROM, after which the existing
399        // subquery-in-FROM machinery handles execution. Recursive
400        // CTEs are rejected explicitly until fixpoint execution wires
401        // through the runtime.
402        if !has_with_prefix(query) {
403            return Ok(None);
404        }
405        let parsed = crate::storage::query::parser::parse(query)
406            .map_err(|err| RedDBError::Query(err.to_string()))?;
407        if parsed.with_clause.is_some() {
408            let rewritten = crate::storage::query::executors::inline_ctes(parsed)
409                .map_err(|err| RedDBError::Query(err.to_string()))?;
410            return Ok(Some(rewritten));
411        }
412        // No WITH after parse (the prefix matched something else like
413        // `WITHIN` that already routed elsewhere) — fall through to
414        // the normal path with the original query.
415        Ok(None)
416    }
417
418    pub(super) fn prepare_statement(
419        &self,
420        runtime: &RedDBRuntime,
421        query: &str,
422    ) -> RedDBResult<PreparedStatement> {
423        let mode = detect_mode(query);
424        if matches!(mode, QueryMode::Unknown) {
425            return Err(RedDBError::Query("unable to detect query mode".to_string()));
426        }
427
428        // ── Plan cache: reuse only exact-query ASTs ──
429        //
430        // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
431        // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
432        // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
433        // Plan cache applies to statements whose shape can be
434        // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
435        // reuses the same plan across thousands of varying literals).
436        // INSERT is still bypassed — its shape changes per column set
437        // and bulk paths don't go through here anyway.
438        let first_word = query
439            .trim()
440            .split_ascii_whitespace()
441            .next()
442            .unwrap_or("")
443            .to_ascii_uppercase();
444        let is_insert = first_word == "INSERT";
445
446        // Fused normalize+extract: one byte-scan produces both the
447        // cache_key AND the literal bindings. Saves a second Lexer
448        // pass over the query text on every cache hit — dominant
449        // cost on tight UPDATE loops that hit the same shape
450        // thousands of times with varying literals.
451        let (cache_key, prescan_binds) = if is_insert {
452            (String::new(), Vec::new())
453        } else {
454            crate::storage::query::planner::cache_key::normalize_and_extract(query)
455        };
456
457        let expr = if is_insert {
458            // Bypass plan cache for INSERT — shape varies per query.
459            parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
460        } else {
461            // ── Hot path: read lock only (no writer serialization on cache hits) ──
462            //
463            // peek() is a non-mutating probe: no LRU promotion, no touch().
464            // This lets concurrent readers proceed without blocking each other.
465            // On hit we bind literals if needed and return immediately.
466            // Only on miss do we drop to a write lock to parse + insert.
467            let hit = {
468                let plan_cache = runtime.inner.query_cache.read();
469                plan_cache.peek(&cache_key).map(|cached| {
470                    let parameter_count = cached.parameter_count;
471                    let optimized = cached.plan.optimized.clone();
472                    let exact_query = cached.exact_query.clone();
473                    (parameter_count, optimized, exact_query)
474                })
475            };
476
477            if let Some((parameter_count, optimized, exact_query)) = hit {
478                if parameter_count > 0 {
479                    // Shape hit: use the binds extracted during normalise.
480                    let shape_binds = prescan_binds.clone();
481                    if let Some(bound) =
482                        crate::storage::query::planner::shape::bind_parameterized_query(
483                            &optimized,
484                            &shape_binds,
485                            parameter_count,
486                        )
487                    {
488                        bound
489                    } else if exact_query.as_deref() == Some(query) {
490                        // Bind failed but exact query matches — use as-is.
491                        optimized
492                    } else {
493                        // Bind failed and literals differ: re-parse fresh.
494                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
495                    }
496                } else {
497                    // No parameters means either there truly are no literals,
498                    // or this statement type does not participate in shape
499                    // parameterization (for example graph/queue commands).
500                    // Reusing a normalized-cache hit across a different exact
501                    // query can therefore leak stale literals into execution.
502                    if exact_query.as_deref() == Some(query) {
503                        optimized
504                    } else {
505                        parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
506                    }
507                }
508            } else {
509                // Cache miss — parse, parameterize, store.
510                let parsed =
511                    parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
512                let (cached_expr, parameter_count) = if let Some(prepared) =
513                    crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
514                {
515                    (prepared.shape, prepared.parameter_count)
516                } else {
517                    (parsed.clone(), 0)
518                };
519                {
520                    let mut pc = runtime.inner.query_cache.write();
521                    let plan = crate::storage::query::planner::QueryPlan::new(
522                        parsed.clone(),
523                        cached_expr,
524                        Default::default(),
525                    );
526                    pc.insert(
527                        cache_key.clone(),
528                        crate::storage::query::planner::CachedPlan::new(plan)
529                            .with_shape_key(cache_key.clone())
530                            .with_exact_query(query.to_string())
531                            .with_parameter_count(parameter_count),
532                    );
533                }
534                parsed
535            }
536        };
537
538        // Phase 5 PG parity: substitute any registered view name that
539        // appears in the expression with its stored body. Runs after
540        // parse and before dispatch so the SQL entrypoint gets the
541        // same view resolution `execute_query_expr` already does.
542        let expr = runtime.rewrite_view_refs(expr);
543
544        Ok(PreparedStatement { expr, mode })
545    }
546
547    pub(super) fn check_query_privilege(
548        &self,
549        runtime: &RedDBRuntime,
550        expr: &QueryExpr,
551    ) -> RedDBResult<()> {
552        // Frame-level coarse gate. We consult `required_privilege()`
553        // (computed once at frame-build) against the captured identity
554        // before the deep grant engine walks the parsed expression.
555        // The coarse gate cannot ALLOW anything the grant engine would
556        // deny — it only short-circuits the obvious "Role::Read tries
557        // INSERT" case so a downstream caller never has to redo this
558        // check inline. `Privilege::None` (transaction control / SET /
559        // SHOW) flows through unchanged; the grant engine treats those
560        // as bypass too.
561        if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
562            let needed = <Self as ReadFrame>::required_privilege(self);
563            if !needed.is_satisfied_by(role) {
564                // Issue #205 — when the deep grant engine *also*
565                // denies, we treat this as an ordinary permission
566                // failure. But when an Admin-only statement reaches
567                // this gate without an auth_store wired (so the deep
568                // engine can't double-check), the coarse rejection is
569                // the only line of defence — emit an OperatorEvent so
570                // the operator notices an Admin-class statement was
571                // attempted with insufficient role.
572                if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
573                    crate::telemetry::operator_event::OperatorEvent::AuthBypass {
574                        principal: username.to_string(),
575                        resource: format!("statement requiring {needed:?}"),
576                        detail: format!(
577                            "auth_store not wired; coarse gate is sole defence (role={role:?})"
578                        ),
579                    }
580                    .emit_global();
581                }
582                return Err(RedDBError::Query(format!(
583                    "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
584                )));
585            }
586        }
587        runtime
588            .check_query_privilege(expr)
589            .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
590    }
591
592    pub(super) fn prepare_dispatch(
593        &self,
594        runtime: &RedDBRuntime,
595        expr: &QueryExpr,
596    ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
597        runtime.validate_model_operations_before_auth(expr)?;
598        self.check_query_privilege(runtime, expr)?;
599        Ok(self.acquire_intent_locks(runtime, expr))
600    }
601
602    pub(super) fn acquire_intent_locks(
603        &self,
604        runtime: &RedDBRuntime,
605        expr: &QueryExpr,
606    ) -> Option<crate::runtime::locking::LockerGuard> {
607        if !runtime.config_bool("concurrency.locking.enabled", true) {
608            return None;
609        }
610        // Frame-level short-circuit: if the statement carries no lock
611        // intent (transaction control, SET, SHOW), skip the lock
612        // manager entirely instead of letting `intent_lock_modes_for`
613        // walk the parsed expression to reach the same conclusion.
614        if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
615            return None;
616        }
617        intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
618            let mut guard =
619                crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
620            let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
621            for collection in collections_referenced(expr) {
622                let _ = guard.acquire(
623                    crate::runtime::locking::Resource::Collection(collection),
624                    coll_mode,
625                );
626            }
627            guard
628        })
629    }
630}
631
632impl ReadFrame for StatementExecutionFrame {
633    fn effective_scope(&self) -> Option<&str> {
634        self.effective_scope.as_deref()
635    }
636
637    fn identity(&self) -> Option<(&str, Role)> {
638        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
639    }
640
641    fn snapshot(&self) -> &Snapshot {
642        &self.snapshot
643    }
644
645    fn as_of_floor(&self) -> Option<Xid> {
646        self.as_of_floor
647    }
648
649    fn cache_key(&self) -> &str {
650        &self.cache_key
651    }
652
653    fn should_cache_result(&self) -> bool {
654        !self.is_volatile_query && self.cache_safe
655    }
656
657    fn required_privilege(&self) -> Privilege {
658        self.required_privilege
659    }
660
661    fn lock_intent(&self) -> LockIntent {
662        self.lock_intent
663    }
664
665    fn visible_collections(&self) -> Option<&HashSet<String>> {
666        self.visible_collections.as_ref()
667    }
668}
669
670/// Lightweight `ReadFrame` carrier used by AI command entry points
671/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
672///
673/// Issue #119 calls this struct `EffectiveScope`. It bundles the
674/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
675/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
676/// instead of re-reading thread-locals at every call site.
677///
678/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
679/// from the per-statement thread-locals (identical to how
680/// `StatementExecutionFrame::build` derives them) and resolves
681/// `visible_collections` via the `AuthStore` cache.
682pub struct EffectiveScope {
683    pub(crate) tenant: Option<String>,
684    pub(crate) identity: Option<(String, Role)>,
685    pub(crate) snapshot: Snapshot,
686    pub(crate) visible_collections: Option<HashSet<String>>,
687}
688
689impl EffectiveScope {
690    /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
691    /// to gate LLM-backed NER calls behind `ai:ner:read`.
692    ///
693    /// Placeholder for now: always returns `false`. The auth engine's
694    /// capability matrix is future work; until it lands, every routed
695    /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
696    /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
697    /// Documented in code so the wire-up is a one-line change once
698    /// the auth engine learns capabilities.
699    pub fn has_capability(&self, _capability: &str) -> bool {
700        false
701    }
702}
703
704impl ReadFrame for EffectiveScope {
705    fn effective_scope(&self) -> Option<&str> {
706        self.tenant.as_deref()
707    }
708    fn identity(&self) -> Option<(&str, Role)> {
709        self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
710    }
711    fn snapshot(&self) -> &Snapshot {
712        &self.snapshot
713    }
714    fn as_of_floor(&self) -> Option<Xid> {
715        None
716    }
717    fn cache_key(&self) -> &str {
718        ""
719    }
720    fn should_cache_result(&self) -> bool {
721        false
722    }
723    fn required_privilege(&self) -> Privilege {
724        Privilege::Read
725    }
726    fn lock_intent(&self) -> LockIntent {
727        LockIntent::Shared
728    }
729    fn visible_collections(&self) -> Option<&HashSet<String>> {
730        self.visible_collections.as_ref()
731    }
732}
733
734impl RedDBRuntime {
735    /// Build the AI command `EffectiveScope` from the current
736    /// statement thread-locals + auth store.
737    ///
738    /// Returns `None` for embedded callers (no auth store, no
739    /// identity) — `AuthorizedSearch` treats `None` as deny-default.
740    pub(crate) fn ai_scope(&self) -> EffectiveScope {
741        let tenant = super::impl_core::current_tenant();
742        let identity = super::impl_core::current_auth_identity();
743        let snapshot = self.current_snapshot();
744        let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
745            (Some(store), Some((principal, role))) => {
746                let collections = self.inner.db.store().list_collections();
747                Some(store.visible_collections_for_scope(
748                    tenant.as_deref(),
749                    *role,
750                    principal,
751                    &collections,
752                ))
753            }
754            _ => None,
755        };
756        EffectiveScope {
757            tenant,
758            identity,
759            snapshot,
760            visible_collections,
761        }
762    }
763}
764
765/// Test fixtures for callers that need to drive `ReadFrame` without
766/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
767/// only leaks across module boundaries inside the crate.
768#[cfg(test)]
769pub(crate) mod test_support {
770    use super::{LockIntent, Privilege, ReadFrame};
771    use crate::auth::Role;
772    use crate::storage::transaction::snapshot::{Snapshot, Xid};
773    use std::collections::HashSet;
774
775    /// A `ReadFrame` impl with hand-set fields. Used by
776    /// `authorized_search` tests to assert the deny-default and
777    /// scope-trim behaviour without going through frame construction.
778    pub(crate) struct FakeReadFrame {
779        pub tenant: Option<String>,
780        pub identity: Option<(String, Role)>,
781        pub snapshot: Snapshot,
782        pub visible: Option<HashSet<String>>,
783    }
784
785    impl FakeReadFrame {
786        pub(crate) fn without_scope() -> Self {
787            Self {
788                tenant: None,
789                identity: None,
790                snapshot: Snapshot {
791                    xid: 0,
792                    in_progress: HashSet::new(),
793                },
794                visible: None,
795            }
796        }
797
798        pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
799            Self {
800                tenant: Some("acme".to_string()),
801                identity: Some(("alice".to_string(), Role::Read)),
802                snapshot: Snapshot {
803                    xid: 0,
804                    in_progress: HashSet::new(),
805                },
806                visible: Some(visible),
807            }
808        }
809    }
810
811    impl ReadFrame for FakeReadFrame {
812        fn effective_scope(&self) -> Option<&str> {
813            self.tenant.as_deref()
814        }
815        fn identity(&self) -> Option<(&str, Role)> {
816            self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
817        }
818        fn snapshot(&self) -> &Snapshot {
819            &self.snapshot
820        }
821        fn as_of_floor(&self) -> Option<Xid> {
822            None
823        }
824        fn cache_key(&self) -> &str {
825            ""
826        }
827        fn should_cache_result(&self) -> bool {
828            false
829        }
830        fn required_privilege(&self) -> Privilege {
831            Privilege::Read
832        }
833        fn lock_intent(&self) -> LockIntent {
834            LockIntent::Shared
835        }
836        fn visible_collections(&self) -> Option<&HashSet<String>> {
837            self.visible.as_ref()
838        }
839    }
840}
841
842impl RedDBRuntime {
843    fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
844        let mut set = HashSet::new();
845        if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
846            set.insert(ctx.xid);
847            for (_, sub) in &ctx.savepoints {
848                set.insert(*sub);
849            }
850        }
851        set
852    }
853
854    /// Resolve the snapshot for the current statement, returning
855    /// the snapshot itself and (when AS OF is in effect) the
856    /// resolved xid floor. The floor is the same xid carried inside
857    /// `Snapshot.xid` for AS OF reads — exposing it separately lets
858    /// the `ReadFrame` Interface tell "live read" from "historical
859    /// read" without inferring from `in_progress.is_empty()`.
860    fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
861        match peek_top_level_as_of_with_table(query) {
862            Some((spec, Some(table))) => {
863                if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
864                    return Err(RedDBError::InvalidConfig(format!(
865                        "AS OF requires a versioned collection — \
866                         `{table}` has not opted in. \
867                         Call vcs.set_versioned(\"{table}\", true) first."
868                    )));
869                }
870                let xid = self.vcs_resolve_as_of(spec)?;
871                Ok((
872                    Snapshot {
873                        xid,
874                        in_progress: HashSet::new(),
875                    },
876                    Some(xid),
877                ))
878            }
879            Some((spec, None)) => {
880                let xid = self.vcs_resolve_as_of(spec)?;
881                Ok((
882                    Snapshot {
883                        xid,
884                        in_progress: HashSet::new(),
885                    },
886                    Some(xid),
887                ))
888            }
889            None => Ok((self.current_snapshot(), None)),
890        }
891    }
892
893    fn result_cache_safe(&self, conn_id: u64) -> bool {
894        let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
895        let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
896        !has_active_xids && !in_own_tx
897    }
898}
899
900fn result_cache_key(query: &str) -> String {
901    let tenant = current_tenant().unwrap_or_default();
902    let auth = current_auth_identity()
903        .map(|(user, role)| format!("{}|{:?}", user, role))
904        .unwrap_or_default();
905    if tenant.is_empty() && auth.is_empty() {
906        query.to_string()
907    } else {
908        format!("{query}\u{001e}{tenant}\u{001e}{auth}")
909    }
910}
911
912#[cfg(test)]
913mod tests {
914    use super::*;
915    use crate::api::RedDBOptions;
916    use crate::runtime::impl_core::{
917        clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
918        set_current_tenant,
919    };
920    use crate::runtime::RedDBRuntime;
921
922    fn fresh_runtime() -> RedDBRuntime {
923        RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
924    }
925
926    /// Ensure thread-local state from a prior test can't leak into
927    /// the next one — tests in the same binary share the thread.
928    fn reset_thread_locals() {
929        clear_current_tenant();
930        clear_current_auth_identity();
931    }
932
933    #[test]
934    fn autocommit_select_takes_live_snapshot() {
935        reset_thread_locals();
936        let rt = fresh_runtime();
937        let frame =
938            StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
939
940        // Live reads: no AS OF floor, snapshot bounded by the
941        // manager's `peek_next_xid` so committed tuples are visible.
942        let f: &dyn ReadFrame = &frame;
943        assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
944        assert!(
945            f.snapshot().xid >= 1,
946            "autocommit snapshot xid is bounded by peek_next_xid"
947        );
948    }
949
950    #[test]
951    fn frame_captures_identity_and_scope() {
952        reset_thread_locals();
953        set_current_tenant("acme".to_string());
954        set_current_auth_identity("alice".to_string(), Role::Write);
955
956        let rt = fresh_runtime();
957        let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
958        let f: &dyn ReadFrame = &frame;
959
960        assert_eq!(f.effective_scope(), Some("acme"));
961        let id = f.identity().expect("identity captured");
962        assert_eq!(id.0, "alice");
963        assert!(matches!(id.1, Role::Write));
964
965        // Cache key mixes scope + identity so two callers under
966        // different tenants never share a cache slot.
967        assert!(
968            f.cache_key().contains("acme") && f.cache_key().contains("alice"),
969            "cache key folds in scope + identity, got {:?}",
970            f.cache_key()
971        );
972
973        reset_thread_locals();
974    }
975
976    #[test]
977    fn as_of_rejects_non_versioned_user_collection() {
978        reset_thread_locals();
979        let rt = fresh_runtime();
980
981        // `not_versioned` is a plain user collection — the frame
982        // builder must reject AS OF until the caller opts in via
983        // `vcs.set_versioned`.
984        let err = match StatementExecutionFrame::build(
985            &rt,
986            "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
987        ) {
988            Err(e) => e,
989            Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
990        };
991
992        let msg = format!("{err}");
993        assert!(
994            msg.contains("AS OF requires a versioned collection"),
995            "expected AS OF rejection, got: {msg}"
996        );
997    }
998
999    /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1000    ///
1001    /// Sets a tenant + identity via the public thread-local API the
1002    /// runtime uses for ambient scope, drives a real `SELECT` through
1003    /// `execute_query`, then inspects the result cache that the SELECT
1004    /// path populates via `frame.cache_key()`. The key only carries
1005    /// the tenant + identity *because* it was built through the frame —
1006    /// reverting the wiring to inline `current_tenant()` /
1007    /// `current_auth_identity()` reads would still pass this test, but
1008    /// dropping the frame entirely (so the SELECT path stopped touching
1009    /// `cache_key`) would break it.
1010    #[test]
1011    fn select_path_routes_through_frame_cache_key() {
1012        reset_thread_locals();
1013        set_current_tenant("acme".to_string());
1014        set_current_auth_identity("alice".to_string(), Role::Read);
1015
1016        let rt = fresh_runtime();
1017        let result = rt
1018            .execute_query("SELECT 1")
1019            .expect("SELECT 1 executes under tenant=acme/identity=alice");
1020        assert_eq!(result.statement_type, "select");
1021
1022        // The SELECT path (in `execute_query_expr`) builds a frame and
1023        // writes its result through `frame.cache_key()`. That key folds
1024        // tenant + identity in via `result_cache_key`, so finding "acme"
1025        // and "alice" inside any cached key proves the frame was the
1026        // seam used.
1027        let cache = rt.inner.result_cache.read();
1028        let any_keyed_with_scope = cache
1029            .0
1030            .keys()
1031            .any(|k| k.contains("acme") && k.contains("alice"));
1032        assert!(
1033            any_keyed_with_scope,
1034            "expected at least one result-cache key carrying tenant+identity, \
1035             got keys: {:?}",
1036            cache.0.keys().collect::<Vec<_>>()
1037        );
1038
1039        reset_thread_locals();
1040    }
1041
1042    /// A SELECT that calls a volatile builtin (here:
1043    /// `pg_advisory_unlock`, the volatile token the runtime currently
1044    /// recognises in `query_has_volatile_builtin`) must NOT populate
1045    /// the result cache. Any caller hitting the cache after this would
1046    /// see a stale answer for an inherently-volatile query, so the
1047    /// SELECT path gates writes through `frame.should_cache_result()`.
1048    ///
1049    /// Deletion test: removing `ReadFrame::should_cache_result`, or
1050    /// reverting the SELECT path to skip its safety gate, would let
1051    /// the result cache silently absorb this statement and break the
1052    /// assertion below.
1053    #[test]
1054    fn volatile_select_does_not_populate_result_cache() {
1055        reset_thread_locals();
1056        let rt = fresh_runtime();
1057
1058        // Frame-level invariant: the volatile-builtin signal collapses
1059        // `should_cache_result` to false even for an autocommit /
1060        // out-of-tx connection.
1061        let frame =
1062            StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1063        let f: &dyn ReadFrame = &frame;
1064        assert!(
1065            !f.should_cache_result(),
1066            "volatile builtin must disable result-cache safety"
1067        );
1068
1069        // End-to-end: drive the volatile SELECT through `execute_query`
1070        // and confirm no entry was stamped under its cache key. Other
1071        // entries from prior tests sharing the binary may exist, so we
1072        // assert specifically on this query's key.
1073        let _ = rt
1074            .execute_query("SELECT pg_advisory_unlock(1)")
1075            .expect("volatile SELECT executes");
1076        let cache = rt.inner.result_cache.read();
1077        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1078        assert!(
1079            !cache.0.contains_key(&key),
1080            "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1081            cache.0.keys().collect::<Vec<_>>()
1082        );
1083
1084        reset_thread_locals();
1085    }
1086
1087    #[test]
1088    fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1089        reset_thread_locals();
1090        let rt = fresh_runtime();
1091        rt.inner
1092            .db
1093            .store()
1094            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1095
1096        let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1097        assert_eq!(result.statement_type, "select");
1098
1099        let key = result_cache_key("SELECT 1");
1100        assert!(
1101            rt.inner
1102                .result_blob_cache
1103                .get("runtime.result_cache", &key)
1104                .is_some(),
1105            "blob backend should stamp the Blob Cache path"
1106        );
1107        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1108        assert!(
1109            !rt.inner.result_cache.read().0.contains_key(&key),
1110            "blob backend should not write the legacy map"
1111        );
1112    }
1113
1114    #[test]
1115    fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1116        reset_thread_locals();
1117        let rt = fresh_runtime();
1118        rt.inner
1119            .db
1120            .store()
1121            .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1122
1123        let _ = rt
1124            .execute_query("SELECT pg_advisory_unlock(1)")
1125            .expect("volatile SELECT executes");
1126        let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1127        assert!(
1128            rt.inner
1129                .result_blob_cache
1130                .get("runtime.result_cache", &key)
1131                .is_none(),
1132            "volatile SELECT must not populate blob result cache"
1133        );
1134        assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1135    }
1136
1137    #[test]
1138    fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1139        reset_thread_locals();
1140        let rt = fresh_runtime();
1141        rt.inner
1142            .db
1143            .store()
1144            .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1145
1146        let first = rt.execute_query("SELECT 1").expect("first SELECT");
1147        let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1148        assert_eq!(first.result.len(), second.result.len());
1149
1150        let key = result_cache_key("SELECT 1");
1151        assert!(rt.inner.result_cache.read().0.contains_key(&key));
1152        assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1153        assert_eq!(rt.result_cache_shadow_divergences(), 0);
1154        assert_eq!(
1155            crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1156            "cache_shadow_divergence_total"
1157        );
1158    }
1159
1160    #[test]
1161    fn as_of_on_red_collection_records_floor() {
1162        reset_thread_locals();
1163        let rt = fresh_runtime();
1164
1165        // `red_*` collections always allow AS OF. The frame should
1166        // resolve to a concrete xid and surface it via the Interface.
1167        let frame =
1168            StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1169                .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1170
1171        let f: &dyn ReadFrame = &frame;
1172        assert_eq!(
1173            f.as_of_floor(),
1174            Some(1),
1175            "AS OF SNAPSHOT 1 records xid=1 as the floor"
1176        );
1177        assert_eq!(f.snapshot().xid, 1);
1178        assert!(
1179            f.snapshot().in_progress.is_empty(),
1180            "historical reads have no in-progress set"
1181        );
1182    }
1183
1184    /// The frame classifies common SQL prefixes into the coarse
1185    /// `Privilege` / `LockIntent` buckets at build time. This test
1186    /// pins the mapping so a regression that silently re-routes
1187    /// (e.g. INSERT classified as Read) surfaces here, not at a
1188    /// downstream privilege gate.
1189    #[test]
1190    fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1191        reset_thread_locals();
1192        let rt = fresh_runtime();
1193
1194        let cases = [
1195            ("SELECT 1", Privilege::Read, LockIntent::Shared),
1196            (
1197                "INSERT INTO t (id) VALUES (1)",
1198                Privilege::Write,
1199                LockIntent::Exclusive,
1200            ),
1201            (
1202                "UPDATE t SET x = 1 WHERE id = 1",
1203                Privilege::Write,
1204                LockIntent::Exclusive,
1205            ),
1206            (
1207                "DELETE FROM t WHERE id = 1",
1208                Privilege::Write,
1209                LockIntent::Exclusive,
1210            ),
1211            (
1212                "CREATE TABLE foo (id INT)",
1213                Privilege::Write,
1214                LockIntent::Exclusive,
1215            ),
1216            ("BEGIN", Privilege::None, LockIntent::None),
1217            ("COMMIT", Privilege::None, LockIntent::None),
1218            ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1219        ];
1220
1221        for (q, want_priv, want_lock) in cases {
1222            let frame = StatementExecutionFrame::build(&rt, q)
1223                .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1224            let f: &dyn ReadFrame = &frame;
1225            assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1226            assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1227        }
1228    }
1229
1230    /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1231    /// driven through `execute_query` under an identity whose role
1232    /// doesn't satisfy the frame's coarse `Read` privilege gets
1233    /// denied with the frame's signal.
1234    ///
1235    /// We test the gate by classifying an INSERT (which the frame
1236    /// reports as `Privilege::Write`) under `Role::Read` — the only
1237    /// pair the legacy fallback would also reject, but here the
1238    /// rejection comes through `frame.check_query_privilege` BEFORE
1239    /// the parsed-expression walker runs. Removing
1240    /// `required_privilege` (or the `is_satisfied_by` consult inside
1241    /// `check_query_privilege`) would force the deny path back to the
1242    /// inline `RedDBRuntime::check_query_privilege` walker — but the
1243    /// auth_store gate up there is bypassed when no auth_store is
1244    /// wired (embedded test mode), so this test would FLIP from
1245    /// denied to permitted and break the assertion below.
1246    #[test]
1247    fn insert_under_read_role_denied_via_frame_privilege() {
1248        reset_thread_locals();
1249        set_current_auth_identity("alice".to_string(), Role::Read);
1250
1251        let rt = fresh_runtime();
1252        // Bypass parser by reaching into the frame directly: the
1253        // frame derives privilege from the SQL prefix without
1254        // needing an auth_store wired up. Driving end-to-end via
1255        // `execute_query` would also reject (no table `t`), but for
1256        // a different reason — we want to pin the privilege seam.
1257        let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1258            .expect("frame builds for INSERT");
1259        let f: &dyn ReadFrame = &frame;
1260        assert_eq!(
1261            f.required_privilege(),
1262            Privilege::Write,
1263            "INSERT classified as Write"
1264        );
1265        let id = f.identity().expect("identity captured");
1266        assert!(
1267            !f.required_privilege().is_satisfied_by(id.1),
1268            "Role::Read does not satisfy Privilege::Write — frame must deny"
1269        );
1270
1271        // End-to-end: the frame's `check_query_privilege` sees the
1272        // (Read role, Write privilege) mismatch and denies before
1273        // dispatch. We drive a synthetic `QueryExpr::Table` because
1274        // the SELECT/INSERT parser would happen to also fail, and we
1275        // want the failure to come from the privilege seam.
1276        use crate::storage::query::ast::{QueryExpr, TableQuery};
1277        let expr = QueryExpr::Table(TableQuery::new("t"));
1278        let err = frame
1279            .check_query_privilege(&rt, &expr)
1280            .expect_err("denied via frame's coarse privilege gate");
1281        let msg = format!("{err}");
1282        assert!(
1283            msg.contains("permission denied") && msg.contains("Write"),
1284            "expected frame-level Write deny, got: {msg}"
1285        );
1286
1287        reset_thread_locals();
1288    }
1289
1290    /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1291    /// control statement carries `LockIntent::None` and the
1292    /// `acquire_intent_locks` path returns `None` without consulting
1293    /// `intent_lock_modes_for`. Removing the method (or its consult
1294    /// site in `acquire_intent_locks`) would force the lock-mode
1295    /// helper to walk a fabricated parsed expression to reach the
1296    /// same conclusion — but the assertion that no guard is allocated
1297    /// for a `BEGIN` frame would still hold, so we additionally pin
1298    /// the classifier mapping above to make the deletion observable.
1299    #[test]
1300    fn control_statement_skips_intent_locks_via_frame() {
1301        reset_thread_locals();
1302        let rt = fresh_runtime();
1303
1304        let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1305        let f: &dyn ReadFrame = &frame;
1306        assert_eq!(f.lock_intent(), LockIntent::None);
1307
1308        // Drive `acquire_intent_locks` against a fabricated SELECT
1309        // expression that WOULD normally yield `(IS, IS)`; the frame's
1310        // `lock_intent() == None` short-circuit must still suppress
1311        // the guard.
1312        use crate::storage::query::ast::{QueryExpr, TableQuery};
1313        let expr = QueryExpr::Table(TableQuery::new("t"));
1314        let guard = frame.acquire_intent_locks(&rt, &expr);
1315        assert!(
1316            guard.is_none(),
1317            "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1318        );
1319    }
1320}