reddb_server/runtime/statement_frame.rs
1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7 collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8 has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9 query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10 SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30 /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31 /// any role from `Role::Read` upward.
32 Read,
33 /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34 /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35 /// at least `Role::Write`.
36 Write,
37 /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38 /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39 Admin,
40 /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41 /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42 /// for any authenticated identity.
43 None,
44}
45
46impl Privilege {
47 /// `true` iff `role` is sufficient to execute a statement carrying
48 /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49 /// Admin` containment used by the auth fallback path.
50 pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51 match self {
52 Self::None => true,
53 Self::Read => role.can_read(),
54 Self::Write => role.can_write(),
55 Self::Admin => role.can_admin(),
56 }
57 }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67 /// No collection-level lock needed (transaction control, SET,
68 /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69 None,
70 /// Reader-style intent: SELECT, joins, graph / queue / search
71 /// reads. Maps to `(IS, IS)` at the storage layer.
72 Shared,
73 /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74 /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75 /// `Exclusive` at this granularity — call sites that need the
76 /// finer distinction still consult `intent_lock_modes_for`.
77 Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93 /// Effective tenant scope for the statement after WITHIN /
94 /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95 /// "no tenant bound" (RLS deny-default applies).
96 fn effective_scope(&self) -> Option<&str>;
97
98 /// Authenticated identity observed at frame-build time, if any.
99 /// Returns `(username, role)` so callers can render audit lines
100 /// or feed RLS policy lookups without re-reading thread-locals.
101 fn identity(&self) -> Option<(&str, Role)>;
102
103 /// MVCC snapshot the statement reads against. For autocommit
104 /// this is a fresh snapshot; inside an active transaction it
105 /// is the txn's snapshot; under AS OF it is the resolved
106 /// historical xid.
107 fn snapshot(&self) -> &Snapshot;
108
109 /// AS OF xid floor when AS OF was applied for this statement,
110 /// `None` for live reads. Useful for downstream callers that
111 /// want to gate behaviour on historical-read mode without
112 /// re-parsing the query.
113 fn as_of_floor(&self) -> Option<Xid>;
114
115 /// Stable result-cache key for the statement (already mixes
116 /// effective tenant + identity).
117 fn cache_key(&self) -> &str;
118
119 /// Whether the statement is safe to serve from / populate the
120 /// result cache. Combines two underlying signals:
121 ///
122 /// * the query does not call a volatile builtin (e.g. `NOW()`,
123 /// `RANDOM()`, `UUID()`), which would change between calls,
124 /// * the connection is not inside an active transaction with
125 /// uncommitted writes that other readers shouldn't observe.
126 ///
127 /// SELECT cache callsites (read + write) consult this method
128 /// instead of re-deriving safety from globals or poking the
129 /// frame's private fields. Removing it would force every cache
130 /// callsite to re-run `query_has_volatile_builtin` plus
131 /// `result_cache_safe(conn_id)` inline.
132 fn should_cache_result(&self) -> bool;
133
134 /// Coarse privilege class the statement requires, computed once
135 /// at frame-build time from the SQL prefix. Read/write dispatch
136 /// sites consult this instead of re-classifying the parsed
137 /// `QueryExpr` inline at every callsite.
138 ///
139 /// Removing this method would force every privilege gate to
140 /// recompute the (action, resource) classification from the
141 /// parsed expression and re-check the role hierarchy inline.
142 fn required_privilege(&self) -> Privilege;
143
144 /// Coarse collection-level lock intent the statement implies.
145 /// `None` lets the lock-acquisition path short-circuit without
146 /// touching the lock manager.
147 ///
148 /// Removing this method would force the lock-acquisition path
149 /// to always invoke `intent_lock_modes_for` (which itself walks
150 /// the parsed expression) even for transaction-control / SET /
151 /// SHOW statements that need no collection lock at all.
152 fn lock_intent(&self) -> LockIntent;
153
154 /// Set of collection ids the calling identity is allowed to
155 /// observe under the active `(tenant, role)` scope. Computed once
156 /// at frame-build time via the `AuthStore` visible-collections
157 /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158 /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159 /// before any similarity score is computed (issue #119).
160 ///
161 /// `None` means the frame was built without an auth store wired —
162 /// embedded / single-tenant tests run that way. AI search call
163 /// sites refuse to proceed with `None`, which is the deny-default
164 /// the issue requires; pure SELECT paths fall back to the existing
165 /// per-row RLS gate.
166 fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175 let trimmed = query.trim_start();
176 // Skip a leading line / block comment so the classifier doesn't
177 // misread `/* ... */ SELECT ...` as an unknown statement.
178 let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179 rest.split_once('\n')
180 .map(|(_, r)| r)
181 .unwrap_or("")
182 .trim_start()
183 } else {
184 trimmed
185 };
186 let first = trimmed
187 .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
188 .next()
189 .unwrap_or("");
190 // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
191 let mut buf = [0u8; 16];
192 let bytes = first.as_bytes();
193 let n = bytes.len().min(buf.len());
194 for i in 0..n {
195 buf[i] = bytes[i].to_ascii_uppercase();
196 }
197 match &buf[..n] {
198 b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" | b"RANK"
199 | b"APPROX" | b"APPROXIMATE" | b"ZRANK" | b"ZRANGE" => "read",
200 b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
201 b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
202 b"GRANT" | b"REVOKE" => "admin",
203 b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
204 | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
205 _ => "unknown",
206 }
207}
208
209fn classify_privilege(query: &str) -> Privilege {
210 match statement_kind(query) {
211 "read" => Privilege::Read,
212 "write" => Privilege::Write,
213 // DDL is gated at `Role::Write` in the legacy fallback (see
214 // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
215 // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
216 // GRANT / REVOKE upgrade to Admin via finer checks at the call
217 // site — the frame surfaces only the coarse class.
218 "ddl" => Privilege::Write,
219 "admin" => Privilege::Admin,
220 _ => Privilege::None,
221 }
222}
223
224fn classify_lock_intent(query: &str) -> LockIntent {
225 match statement_kind(query) {
226 "read" => LockIntent::Shared,
227 "write" | "ddl" => LockIntent::Exclusive,
228 _ => LockIntent::None,
229 }
230}
231
232pub(super) struct StatementExecutionFrame {
233 tx_local_tenant: Option<Option<String>>,
234 snapshot: Snapshot,
235 own_xids: HashSet<Xid>,
236 cache_key: String,
237 is_volatile_query: bool,
238 cache_safe: bool,
239 /// Effective tenant captured at frame-build time after WITHIN /
240 /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
241 /// so the `ReadFrame` Interface can return a borrow without
242 /// re-touching the thread-local stack.
243 effective_scope: Option<String>,
244 /// Auth identity captured at frame-build time. `None` for
245 /// embedded / anonymous callers.
246 identity: Option<(String, Role)>,
247 /// `Some(xid)` when AS OF resolved to a historical xid; `None`
248 /// for live reads.
249 as_of_floor: Option<Xid>,
250 /// True when the statement snapshot can require tuple versions that
251 /// current secondary indexes no longer contain.
252 requires_index_fallback: bool,
253 /// Privilege class required by the statement, derived from the
254 /// SQL text at frame-build time. Read/write dispatch sites
255 /// consult this instead of re-classifying the parsed expression.
256 required_privilege: Privilege,
257 /// Collection-level lock intent the statement implies. The
258 /// lock-acquisition path short-circuits when this is `None`.
259 lock_intent: LockIntent,
260 /// Set of collection ids the active `(tenant, role)` scope is
261 /// allowed to observe. Computed at frame-build time via the
262 /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
263 /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
264 /// scoring (issue #119). `None` when no auth store is wired
265 /// (embedded test mode) — AI search refuses on `None`.
266 visible_collections: Option<HashSet<String>>,
267 /// Per-owner buffer arena for query-result row chunks (#885). Owned
268 /// by the frame because the frame already owns the query lifecycle;
269 /// lent to the row-streaming path (`execute_runtime_table_query_in`)
270 /// so chunk buffers are reused across the statement's chunk-fetches
271 /// instead of allocated fresh per chunk. Reclaimed when the frame
272 /// drops at statement end — no `thread_local!` scratch, which would
273 /// be unsound under tokio's work-stealing runtime.
274 row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
275}
276
277pub(super) struct StatementFrameGuards {
278 _tx_local_guard: TxLocalTenantGuard,
279 _config_snapshot_guard: ConfigSnapshotGuard,
280 _secret_store_guard: SecretStoreGuard,
281 _snapshot_guard: CurrentSnapshotGuard,
282}
283
284pub(super) struct PreparedStatement {
285 pub(super) expr: QueryExpr,
286 pub(super) mode: QueryMode,
287}
288
289impl StatementExecutionFrame {
290 pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
291 let conn_id = current_connection_id();
292 let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
293 let own_xids = runtime.own_transaction_xids(conn_id);
294 let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
295 let requires_index_fallback =
296 as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
297 let cache_key = result_cache_key(query);
298 let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
299 let cache_safe = runtime.result_cache_safe(conn_id);
300 // Capture identity + effective scope under the same
301 // thread-local view that the cache key was built from, so
302 // the Interface and the cache key agree on what "this
303 // statement" means.
304 let effective_scope = current_tenant();
305 let identity = current_auth_identity();
306
307 // Coarse classification of the statement, computed once from
308 // the SQL prefix so downstream callers don't re-derive it
309 // from the parsed `QueryExpr` at every privilege / lock site.
310 let required_privilege = classify_privilege(query);
311 let lock_intent = classify_lock_intent(query);
312
313 // Issue #119: resolve the visible-collections set for the
314 // active (tenant, role) scope. Only meaningful when an auth
315 // store is wired *and* an identity was captured — embedded
316 // anonymous callers fall back to `None`, and AI search call
317 // sites refuse on `None`.
318 let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
319 {
320 (Some(store), Some((principal, role))) => {
321 let collections = runtime.inner.db.store().list_collections();
322 Some(store.visible_collections_for_scope(
323 effective_scope.as_deref(),
324 *role,
325 principal,
326 &collections,
327 ))
328 }
329 _ => None,
330 };
331
332 Ok(Self {
333 tx_local_tenant,
334 snapshot,
335 own_xids,
336 cache_key,
337 is_volatile_query,
338 cache_safe,
339 effective_scope,
340 identity,
341 as_of_floor,
342 requires_index_fallback,
343 required_privilege,
344 lock_intent,
345 visible_collections,
346 row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
347 })
348 }
349
350 /// Lend the frame's per-owner row-buffer arena (#885) to the
351 /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
352 /// the owner and the arena is reclaimed when the frame drops at
353 /// statement end.
354 pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
355 Rc::clone(&self.row_arena)
356 }
357
358 pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
359 StatementFrameGuards {
360 _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
361 _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
362 _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
363 _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
364 snapshot: self.snapshot.clone(),
365 manager: Arc::clone(&runtime.inner.snapshot_manager),
366 own_xids: self.own_xids.clone(),
367 requires_index_fallback: self.requires_index_fallback,
368 }),
369 }
370 }
371
372 pub(super) fn cache_key(&self) -> &str {
373 &self.cache_key
374 }
375
376 pub(super) fn can_read_result_cache(&self) -> bool {
377 // Delegates to the `ReadFrame` Interface so the volatile +
378 // active-tx safety decision lives in exactly one place.
379 <Self as ReadFrame>::should_cache_result(self)
380 }
381
382 pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
383 // Cache-safety (volatile builtin, active-tx writes) comes from
384 // the Interface; the rest are write-side payload heuristics
385 // (statement shape, result size) that aren't part of the
386 // safety contract.
387 <Self as ReadFrame>::should_cache_result(self)
388 && result.statement_type == "select"
389 && result.engine != "vault"
390 && result.engine != "runtime-rank"
391 // `QUEUE READ` is a stateful read: a delayed message
392 // (issue #722) becomes deliverable over time without a
393 // producer push to invalidate the cache, so a cached empty
394 // result would hide it. Skip caching entirely.
395 && result.statement != "queue_group_read"
396 && result.result.pre_serialized_json.is_none()
397 // Graph-analytics TVF output (issue #802) is deterministic and
398 // expensive to recompute, so it is cached at any row count. The
399 // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
400 && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
401 }
402
403 pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
404 if self.can_read_result_cache() {
405 runtime.get_result_cache_entry(self.cache_key())
406 } else {
407 None
408 }
409 }
410
411 pub(super) fn write_result_cache(
412 &self,
413 runtime: &RedDBRuntime,
414 result: &RuntimeQueryResult,
415 scopes: HashSet<String>,
416 ) {
417 if self.should_write_result_cache(result) {
418 runtime.put_result_cache_entry(
419 self.cache_key(),
420 RuntimeResultCacheEntry {
421 result: result.clone(),
422 cached_at: std::time::Instant::now(),
423 scopes,
424 },
425 );
426 }
427 }
428
429 pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
430 // Detected via cheap prefix check so non-CTE queries skip the
431 // full parse here. CTE-bearing queries bypass the plan cache
432 // and result cache (rare workload — perf optimization is a
433 // follow-up). Inlining substitutes every CTE reference with
434 // its body as a subquery in FROM, after which the existing
435 // subquery-in-FROM machinery handles execution. Recursive
436 // CTEs are rejected explicitly until fixpoint execution wires
437 // through the runtime.
438 if !has_with_prefix(query) {
439 return Ok(None);
440 }
441 let parsed = crate::storage::query::parser::parse(query)
442 .map_err(|err| RedDBError::Query(err.to_string()))?;
443 if parsed.with_clause.is_some() {
444 let rewritten = crate::storage::query::executors::inline_ctes(parsed)
445 .map_err(|err| RedDBError::Query(err.to_string()))?;
446 return Ok(Some(rewritten));
447 }
448 // No WITH after parse (the prefix matched something else like
449 // `WITHIN` that already routed elsewhere) — fall through to
450 // the normal path with the original query.
451 Ok(None)
452 }
453
454 pub(super) fn prepare_statement(
455 &self,
456 runtime: &RedDBRuntime,
457 query: &str,
458 ) -> RedDBResult<PreparedStatement> {
459 let mode = detect_mode(query);
460 if matches!(mode, QueryMode::Unknown) {
461 return Err(RedDBError::Query("unable to detect query mode".to_string()));
462 }
463
464 // ── Plan cache: reuse only exact-query ASTs ──
465 //
466 // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
467 // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
468 // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
469 // Plan cache applies to statements whose shape can be
470 // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
471 // reuses the same plan across thousands of varying literals).
472 // INSERT is still bypassed — its shape changes per column set
473 // and bulk paths don't go through here anyway.
474 let first_word = query
475 .trim()
476 .split_ascii_whitespace()
477 .next()
478 .unwrap_or("")
479 .to_ascii_uppercase();
480 let is_insert = first_word == "INSERT";
481
482 // Fused normalize+extract: one byte-scan produces both the
483 // cache_key AND the literal bindings. Saves a second Lexer
484 // pass over the query text on every cache hit — dominant
485 // cost on tight UPDATE loops that hit the same shape
486 // thousands of times with varying literals.
487 let (cache_key, prescan_binds) = if is_insert {
488 (String::new(), Vec::new())
489 } else {
490 crate::storage::query::planner::cache_key::normalize_and_extract(query)
491 };
492
493 let expr = if is_insert {
494 // Bypass plan cache for INSERT — shape varies per query.
495 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
496 } else {
497 // ── Hot path: read lock only (no writer serialization on cache hits) ──
498 //
499 // peek() is a non-mutating probe: no LRU promotion, no touch().
500 // This lets concurrent readers proceed without blocking each other.
501 // On hit we bind literals if needed and return immediately.
502 // Only on miss do we drop to a write lock to parse + insert.
503 let hit = {
504 let plan_cache = runtime.inner.query_cache.read();
505 plan_cache.peek(&cache_key).map(|cached| {
506 let parameter_count = cached.parameter_count;
507 let optimized = cached.plan.optimized.clone();
508 let exact_query = cached.exact_query.clone();
509 (parameter_count, optimized, exact_query)
510 })
511 };
512
513 if let Some((parameter_count, optimized, exact_query)) = hit {
514 if parameter_count > 0 {
515 // Shape hit: use the binds extracted during normalise.
516 let shape_binds = prescan_binds.clone();
517 if let Some(bound) =
518 crate::storage::query::planner::shape::bind_parameterized_query(
519 &optimized,
520 &shape_binds,
521 parameter_count,
522 )
523 {
524 bound
525 } else if exact_query.as_deref() == Some(query) {
526 // Bind failed but exact query matches — use as-is.
527 optimized
528 } else {
529 // Bind failed and literals differ: re-parse fresh.
530 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
531 }
532 } else {
533 // No parameters means either there truly are no literals,
534 // or this statement type does not participate in shape
535 // parameterization (for example graph/queue commands).
536 // Reusing a normalized-cache hit across a different exact
537 // query can therefore leak stale literals into execution.
538 if exact_query.as_deref() == Some(query) {
539 optimized
540 } else {
541 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
542 }
543 }
544 } else {
545 // Cache miss — parse, parameterize, store.
546 let parsed =
547 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
548 let (cached_expr, parameter_count) = if let Some(prepared) =
549 crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
550 {
551 (prepared.shape, prepared.parameter_count)
552 } else {
553 (parsed.clone(), 0)
554 };
555 {
556 let mut pc = runtime.inner.query_cache.write();
557 let plan = crate::storage::query::planner::QueryPlan::new(
558 parsed.clone(),
559 cached_expr,
560 Default::default(),
561 );
562 pc.insert(
563 cache_key.clone(),
564 crate::storage::query::planner::CachedPlan::new(plan)
565 .with_shape_key(cache_key.clone())
566 .with_exact_query(query.to_string())
567 .with_parameter_count(parameter_count),
568 );
569 }
570 parsed
571 }
572 };
573
574 // Phase 5 PG parity: substitute any registered view name that
575 // appears in the expression with its stored body. Runs after
576 // parse and before dispatch so the SQL entrypoint gets the
577 // same view resolution `execute_query_expr` already does.
578 let expr = runtime.rewrite_view_refs(expr);
579
580 Ok(PreparedStatement { expr, mode })
581 }
582
583 pub(super) fn check_query_privilege(
584 &self,
585 runtime: &RedDBRuntime,
586 expr: &QueryExpr,
587 ) -> RedDBResult<()> {
588 // Frame-level coarse gate. We consult `required_privilege()`
589 // (computed once at frame-build) against the captured identity
590 // before the deep grant engine walks the parsed expression.
591 // The coarse gate cannot ALLOW anything the grant engine would
592 // deny — it only short-circuits the obvious "Role::Read tries
593 // INSERT" case so a downstream caller never has to redo this
594 // check inline. `Privilege::None` (transaction control / SET /
595 // SHOW) flows through unchanged; the grant engine treats those
596 // as bypass too.
597 if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
598 let needed = <Self as ReadFrame>::required_privilege(self);
599 if !needed.is_satisfied_by(role) {
600 // Issue #205 — when the deep grant engine *also*
601 // denies, we treat this as an ordinary permission
602 // failure. But when an Admin-only statement reaches
603 // this gate without an auth_store wired (so the deep
604 // engine can't double-check), the coarse rejection is
605 // the only line of defence — emit an OperatorEvent so
606 // the operator notices an Admin-class statement was
607 // attempted with insufficient role.
608 if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
609 crate::telemetry::operator_event::OperatorEvent::AuthBypass {
610 principal: username.to_string(),
611 resource: format!("statement requiring {needed:?}"),
612 detail: format!(
613 "auth_store not wired; coarse gate is sole defence (role={role:?})"
614 ),
615 }
616 .emit_global();
617 }
618 return Err(RedDBError::Query(format!(
619 "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
620 )));
621 }
622 }
623 runtime
624 .check_query_privilege(expr)
625 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
626 }
627
628 pub(super) fn prepare_dispatch(
629 &self,
630 runtime: &RedDBRuntime,
631 expr: &QueryExpr,
632 ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
633 runtime.validate_model_operations_before_auth(expr)?;
634 self.check_query_privilege(runtime, expr)?;
635 Ok(self.acquire_intent_locks(runtime, expr))
636 }
637
638 pub(super) fn acquire_intent_locks(
639 &self,
640 runtime: &RedDBRuntime,
641 expr: &QueryExpr,
642 ) -> Option<crate::runtime::locking::LockerGuard> {
643 if !runtime.config_bool("concurrency.locking.enabled", true) {
644 return None;
645 }
646 // Frame-level short-circuit: if the statement carries no lock
647 // intent (transaction control, SET, SHOW), skip the lock
648 // manager entirely instead of letting `intent_lock_modes_for`
649 // walk the parsed expression to reach the same conclusion.
650 if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
651 return None;
652 }
653 intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
654 let mut guard =
655 crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
656 let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
657 for collection in collections_referenced(expr) {
658 let _ = guard.acquire(
659 crate::runtime::locking::Resource::Collection(collection),
660 coll_mode,
661 );
662 }
663 guard
664 })
665 }
666}
667
668impl ReadFrame for StatementExecutionFrame {
669 fn effective_scope(&self) -> Option<&str> {
670 self.effective_scope.as_deref()
671 }
672
673 fn identity(&self) -> Option<(&str, Role)> {
674 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
675 }
676
677 fn snapshot(&self) -> &Snapshot {
678 &self.snapshot
679 }
680
681 fn as_of_floor(&self) -> Option<Xid> {
682 self.as_of_floor
683 }
684
685 fn cache_key(&self) -> &str {
686 &self.cache_key
687 }
688
689 fn should_cache_result(&self) -> bool {
690 !self.is_volatile_query && self.cache_safe
691 }
692
693 fn required_privilege(&self) -> Privilege {
694 self.required_privilege
695 }
696
697 fn lock_intent(&self) -> LockIntent {
698 self.lock_intent
699 }
700
701 fn visible_collections(&self) -> Option<&HashSet<String>> {
702 self.visible_collections.as_ref()
703 }
704}
705
706/// Lightweight `ReadFrame` carrier used by AI command entry points
707/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
708///
709/// Issue #119 calls this struct `EffectiveScope`. It bundles the
710/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
711/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
712/// instead of re-reading thread-locals at every call site.
713///
714/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
715/// from the per-statement thread-locals (identical to how
716/// `StatementExecutionFrame::build` derives them) and resolves
717/// `visible_collections` via the `AuthStore` cache.
718pub struct EffectiveScope {
719 pub(crate) tenant: Option<String>,
720 pub(crate) identity: Option<(String, Role)>,
721 pub(crate) snapshot: Snapshot,
722 pub(crate) visible_collections: Option<HashSet<String>>,
723}
724
725impl EffectiveScope {
726 /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
727 /// to gate LLM-backed NER calls behind `ai:ner:read`.
728 ///
729 /// Placeholder for now: always returns `false`. The auth engine's
730 /// capability matrix is future work; until it lands, every routed
731 /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
732 /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
733 /// Documented in code so the wire-up is a one-line change once
734 /// the auth engine learns capabilities.
735 pub fn has_capability(&self, _capability: &str) -> bool {
736 false
737 }
738}
739
740impl ReadFrame for EffectiveScope {
741 fn effective_scope(&self) -> Option<&str> {
742 self.tenant.as_deref()
743 }
744 fn identity(&self) -> Option<(&str, Role)> {
745 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
746 }
747 fn snapshot(&self) -> &Snapshot {
748 &self.snapshot
749 }
750 fn as_of_floor(&self) -> Option<Xid> {
751 None
752 }
753 fn cache_key(&self) -> &str {
754 ""
755 }
756 fn should_cache_result(&self) -> bool {
757 false
758 }
759 fn required_privilege(&self) -> Privilege {
760 Privilege::Read
761 }
762 fn lock_intent(&self) -> LockIntent {
763 LockIntent::Shared
764 }
765 fn visible_collections(&self) -> Option<&HashSet<String>> {
766 self.visible_collections.as_ref()
767 }
768}
769
770impl RedDBRuntime {
771 /// Build the AI command `EffectiveScope` from the current
772 /// statement thread-locals + auth store.
773 ///
774 /// Returns `None` for embedded callers (no auth store, no
775 /// identity) — `AuthorizedSearch` treats `None` as deny-default.
776 pub(crate) fn ai_scope(&self) -> EffectiveScope {
777 let tenant = super::impl_core::current_tenant();
778 let identity = super::impl_core::current_auth_identity();
779 let snapshot = self.current_snapshot();
780 let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
781 (Some(store), Some((principal, role))) => {
782 let collections = self.inner.db.store().list_collections();
783 Some(store.visible_collections_for_scope(
784 tenant.as_deref(),
785 *role,
786 principal,
787 &collections,
788 ))
789 }
790 _ => None,
791 };
792 EffectiveScope {
793 tenant,
794 identity,
795 snapshot,
796 visible_collections,
797 }
798 }
799}
800
801/// Test fixtures for callers that need to drive `ReadFrame` without
802/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
803/// only leaks across module boundaries inside the crate.
804#[cfg(test)]
805pub(crate) mod test_support {
806 use super::{LockIntent, Privilege, ReadFrame};
807 use crate::auth::Role;
808 use crate::storage::transaction::snapshot::{Snapshot, Xid};
809 use std::collections::HashSet;
810
811 /// A `ReadFrame` impl with hand-set fields. Used by
812 /// `authorized_search` tests to assert the deny-default and
813 /// scope-trim behaviour without going through frame construction.
814 pub(crate) struct FakeReadFrame {
815 pub tenant: Option<String>,
816 pub identity: Option<(String, Role)>,
817 pub snapshot: Snapshot,
818 pub visible: Option<HashSet<String>>,
819 }
820
821 impl FakeReadFrame {
822 pub(crate) fn without_scope() -> Self {
823 Self {
824 tenant: None,
825 identity: None,
826 snapshot: Snapshot {
827 xid: 0,
828 in_progress: HashSet::new(),
829 },
830 visible: None,
831 }
832 }
833
834 pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
835 Self {
836 tenant: Some("acme".to_string()),
837 identity: Some(("alice".to_string(), Role::Read)),
838 snapshot: Snapshot {
839 xid: 0,
840 in_progress: HashSet::new(),
841 },
842 visible: Some(visible),
843 }
844 }
845 }
846
847 impl ReadFrame for FakeReadFrame {
848 fn effective_scope(&self) -> Option<&str> {
849 self.tenant.as_deref()
850 }
851 fn identity(&self) -> Option<(&str, Role)> {
852 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
853 }
854 fn snapshot(&self) -> &Snapshot {
855 &self.snapshot
856 }
857 fn as_of_floor(&self) -> Option<Xid> {
858 None
859 }
860 fn cache_key(&self) -> &str {
861 ""
862 }
863 fn should_cache_result(&self) -> bool {
864 false
865 }
866 fn required_privilege(&self) -> Privilege {
867 Privilege::Read
868 }
869 fn lock_intent(&self) -> LockIntent {
870 LockIntent::Shared
871 }
872 fn visible_collections(&self) -> Option<&HashSet<String>> {
873 self.visible.as_ref()
874 }
875 }
876}
877
878impl RedDBRuntime {
879 fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
880 let mut set = HashSet::new();
881 if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
882 set.insert(ctx.xid);
883 for (_, sub) in &ctx.savepoints {
884 set.insert(*sub);
885 }
886 for sub in &ctx.released_sub_xids {
887 set.insert(*sub);
888 }
889 }
890 set
891 }
892
893 /// Resolve the snapshot for the current statement, returning
894 /// the snapshot itself and (when AS OF is in effect) the
895 /// resolved xid floor. The floor is the same xid carried inside
896 /// `Snapshot.xid` for AS OF reads — exposing it separately lets
897 /// the `ReadFrame` Interface tell "live read" from "historical
898 /// read" without inferring from `in_progress.is_empty()`.
899 fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
900 match peek_top_level_as_of_with_table(query) {
901 Some((spec, Some(table))) => {
902 if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
903 return Err(RedDBError::InvalidConfig(format!(
904 "AS OF requires a versioned collection — \
905 `{table}` has not opted in. \
906 Call vcs.set_versioned(\"{table}\", true) first."
907 )));
908 }
909 let xid = self.vcs_resolve_as_of(spec)?;
910 Ok((
911 Snapshot {
912 xid,
913 in_progress: HashSet::new(),
914 },
915 Some(xid),
916 ))
917 }
918 Some((spec, None)) => {
919 let xid = self.vcs_resolve_as_of(spec)?;
920 Ok((
921 Snapshot {
922 xid,
923 in_progress: HashSet::new(),
924 },
925 Some(xid),
926 ))
927 }
928 None => Ok((self.current_snapshot(), None)),
929 }
930 }
931
932 fn result_cache_safe(&self, conn_id: u64) -> bool {
933 let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
934 let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
935 !has_active_xids && !in_own_tx
936 }
937}
938
939/// Whether a result's `engine` tag is one of the graph-analytics TVF
940/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
941/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
942/// algorithm output that is cached regardless of row count.
943fn is_graph_tvf_engine(engine: &str) -> bool {
944 matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
945}
946
947fn result_cache_key(query: &str) -> String {
948 let tenant = current_tenant().unwrap_or_default();
949 let auth = current_auth_identity()
950 .map(|(user, role)| format!("{}|{:?}", user, role))
951 .unwrap_or_default();
952 if tenant.is_empty() && auth.is_empty() {
953 query.to_string()
954 } else {
955 format!("{query}\u{001e}{tenant}\u{001e}{auth}")
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use super::*;
962 use crate::api::RedDBOptions;
963 use crate::runtime::impl_core::{
964 clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
965 set_current_tenant,
966 };
967 use crate::runtime::RedDBRuntime;
968
969 fn fresh_runtime() -> RedDBRuntime {
970 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
971 }
972
973 /// Ensure thread-local state from a prior test can't leak into
974 /// the next one — tests in the same binary share the thread.
975 fn reset_thread_locals() {
976 clear_current_tenant();
977 clear_current_auth_identity();
978 }
979
980 #[test]
981 fn autocommit_select_takes_live_snapshot() {
982 reset_thread_locals();
983 let rt = fresh_runtime();
984 let frame =
985 StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
986
987 // Live reads: no AS OF floor, snapshot bounded by the
988 // manager's `peek_next_xid` so committed tuples are visible.
989 let f: &dyn ReadFrame = &frame;
990 assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
991 assert!(
992 f.snapshot().xid >= 1,
993 "autocommit snapshot xid is bounded by peek_next_xid"
994 );
995 }
996
997 #[test]
998 fn frame_captures_identity_and_scope() {
999 reset_thread_locals();
1000 set_current_tenant("acme".to_string());
1001 set_current_auth_identity("alice".to_string(), Role::Write);
1002
1003 let rt = fresh_runtime();
1004 let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1005 let f: &dyn ReadFrame = &frame;
1006
1007 assert_eq!(f.effective_scope(), Some("acme"));
1008 let id = f.identity().expect("identity captured");
1009 assert_eq!(id.0, "alice");
1010 assert!(matches!(id.1, Role::Write));
1011
1012 // Cache key mixes scope + identity so two callers under
1013 // different tenants never share a cache slot.
1014 assert!(
1015 f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1016 "cache key folds in scope + identity, got {:?}",
1017 f.cache_key()
1018 );
1019
1020 reset_thread_locals();
1021 }
1022
1023 #[test]
1024 fn as_of_rejects_non_versioned_user_collection() {
1025 reset_thread_locals();
1026 let rt = fresh_runtime();
1027
1028 // `not_versioned` is a plain user collection — the frame
1029 // builder must reject AS OF until the caller opts in via
1030 // `vcs.set_versioned`.
1031 let err = match StatementExecutionFrame::build(
1032 &rt,
1033 "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1034 ) {
1035 Err(e) => e,
1036 Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1037 };
1038
1039 let msg = format!("{err}");
1040 assert!(
1041 msg.contains("AS OF requires a versioned collection"),
1042 "expected AS OF rejection, got: {msg}"
1043 );
1044 }
1045
1046 /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1047 ///
1048 /// Sets a tenant + identity via the public thread-local API the
1049 /// runtime uses for ambient scope, drives a real `SELECT` through
1050 /// `execute_query`, then inspects the result cache that the SELECT
1051 /// path populates via `frame.cache_key()`. The key only carries
1052 /// the tenant + identity *because* it was built through the frame —
1053 /// reverting the wiring to inline `current_tenant()` /
1054 /// `current_auth_identity()` reads would still pass this test, but
1055 /// dropping the frame entirely (so the SELECT path stopped touching
1056 /// `cache_key`) would break it.
1057 #[test]
1058 fn select_path_routes_through_frame_cache_key() {
1059 reset_thread_locals();
1060 set_current_tenant("acme".to_string());
1061 set_current_auth_identity("alice".to_string(), Role::Read);
1062
1063 let rt = fresh_runtime();
1064 let result = rt
1065 .execute_query("SELECT 1")
1066 .expect("SELECT 1 executes under tenant=acme/identity=alice");
1067 assert_eq!(result.statement_type, "select");
1068
1069 // The SELECT path (in `execute_query_expr`) builds a frame and
1070 // writes its result through `frame.cache_key()`. That key folds
1071 // tenant + identity in via `result_cache_key`, so finding "acme"
1072 // and "alice" inside any cached key proves the frame was the
1073 // seam used.
1074 let cache = rt.inner.result_cache.read();
1075 let any_keyed_with_scope = cache
1076 .0
1077 .keys()
1078 .any(|k| k.contains("acme") && k.contains("alice"));
1079 assert!(
1080 any_keyed_with_scope,
1081 "expected at least one result-cache key carrying tenant+identity, \
1082 got keys: {:?}",
1083 cache.0.keys().collect::<Vec<_>>()
1084 );
1085
1086 reset_thread_locals();
1087 }
1088
1089 /// A SELECT that calls a volatile builtin (here:
1090 /// `pg_advisory_unlock`, the volatile token the runtime currently
1091 /// recognises in `query_has_volatile_builtin`) must NOT populate
1092 /// the result cache. Any caller hitting the cache after this would
1093 /// see a stale answer for an inherently-volatile query, so the
1094 /// SELECT path gates writes through `frame.should_cache_result()`.
1095 ///
1096 /// Deletion test: removing `ReadFrame::should_cache_result`, or
1097 /// reverting the SELECT path to skip its safety gate, would let
1098 /// the result cache silently absorb this statement and break the
1099 /// assertion below.
1100 #[test]
1101 fn volatile_select_does_not_populate_result_cache() {
1102 reset_thread_locals();
1103 let rt = fresh_runtime();
1104
1105 // Frame-level invariant: the volatile-builtin signal collapses
1106 // `should_cache_result` to false even for an autocommit /
1107 // out-of-tx connection.
1108 let frame =
1109 StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1110 let f: &dyn ReadFrame = &frame;
1111 assert!(
1112 !f.should_cache_result(),
1113 "volatile builtin must disable result-cache safety"
1114 );
1115
1116 // End-to-end: drive the volatile SELECT through `execute_query`
1117 // and confirm no entry was stamped under its cache key. Other
1118 // entries from prior tests sharing the binary may exist, so we
1119 // assert specifically on this query's key.
1120 let _ = rt
1121 .execute_query("SELECT pg_advisory_unlock(1)")
1122 .expect("volatile SELECT executes");
1123 let cache = rt.inner.result_cache.read();
1124 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1125 assert!(
1126 !cache.0.contains_key(&key),
1127 "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1128 cache.0.keys().collect::<Vec<_>>()
1129 );
1130
1131 reset_thread_locals();
1132 }
1133
1134 #[test]
1135 fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1136 reset_thread_locals();
1137 let rt = fresh_runtime();
1138 rt.inner
1139 .db
1140 .store()
1141 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1142
1143 let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1144 assert_eq!(result.statement_type, "select");
1145
1146 let key = result_cache_key("SELECT 1");
1147 assert!(
1148 rt.inner
1149 .result_blob_cache
1150 .get("runtime.result_cache", &key)
1151 .is_some(),
1152 "blob backend should stamp the Blob Cache path"
1153 );
1154 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1155 assert!(
1156 !rt.inner.result_cache.read().0.contains_key(&key),
1157 "blob backend should not write the legacy map"
1158 );
1159 }
1160
1161 #[test]
1162 fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1163 reset_thread_locals();
1164 let rt = fresh_runtime();
1165 rt.inner
1166 .db
1167 .store()
1168 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1169
1170 let _ = rt
1171 .execute_query("SELECT pg_advisory_unlock(1)")
1172 .expect("volatile SELECT executes");
1173 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1174 assert!(
1175 rt.inner
1176 .result_blob_cache
1177 .get("runtime.result_cache", &key)
1178 .is_none(),
1179 "volatile SELECT must not populate blob result cache"
1180 );
1181 assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1182 }
1183
1184 #[test]
1185 fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1186 reset_thread_locals();
1187 let rt = fresh_runtime();
1188 rt.inner
1189 .db
1190 .store()
1191 .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1192
1193 let first = rt.execute_query("SELECT 1").expect("first SELECT");
1194 let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1195 assert_eq!(first.result.len(), second.result.len());
1196
1197 let key = result_cache_key("SELECT 1");
1198 assert!(rt.inner.result_cache.read().0.contains_key(&key));
1199 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1200 assert_eq!(rt.result_cache_shadow_divergences(), 0);
1201 assert_eq!(
1202 crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1203 "cache_shadow_divergence_total"
1204 );
1205 }
1206
1207 #[test]
1208 fn as_of_on_red_collection_records_floor() {
1209 reset_thread_locals();
1210 let rt = fresh_runtime();
1211
1212 // `red_*` collections always allow AS OF. The frame should
1213 // resolve to a concrete xid and surface it via the Interface.
1214 let frame =
1215 StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1216 .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1217
1218 let f: &dyn ReadFrame = &frame;
1219 assert_eq!(
1220 f.as_of_floor(),
1221 Some(1),
1222 "AS OF SNAPSHOT 1 records xid=1 as the floor"
1223 );
1224 assert_eq!(f.snapshot().xid, 1);
1225 assert!(
1226 f.snapshot().in_progress.is_empty(),
1227 "historical reads have no in-progress set"
1228 );
1229 }
1230
1231 /// The frame classifies common SQL prefixes into the coarse
1232 /// `Privilege` / `LockIntent` buckets at build time. This test
1233 /// pins the mapping so a regression that silently re-routes
1234 /// (e.g. INSERT classified as Read) surfaces here, not at a
1235 /// downstream privilege gate.
1236 #[test]
1237 fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1238 reset_thread_locals();
1239 let rt = fresh_runtime();
1240
1241 let cases = [
1242 ("SELECT 1", Privilege::Read, LockIntent::Shared),
1243 (
1244 "INSERT INTO t (id) VALUES (1)",
1245 Privilege::Write,
1246 LockIntent::Exclusive,
1247 ),
1248 (
1249 "UPDATE t SET x = 1 WHERE id = 1",
1250 Privilege::Write,
1251 LockIntent::Exclusive,
1252 ),
1253 (
1254 "DELETE FROM t WHERE id = 1",
1255 Privilege::Write,
1256 LockIntent::Exclusive,
1257 ),
1258 (
1259 "CREATE TABLE foo (id INT)",
1260 Privilege::Write,
1261 LockIntent::Exclusive,
1262 ),
1263 ("BEGIN", Privilege::None, LockIntent::None),
1264 ("COMMIT", Privilege::None, LockIntent::None),
1265 ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1266 ];
1267
1268 for (q, want_priv, want_lock) in cases {
1269 let frame = StatementExecutionFrame::build(&rt, q)
1270 .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1271 let f: &dyn ReadFrame = &frame;
1272 assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1273 assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1274 }
1275 }
1276
1277 /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1278 /// driven through `execute_query` under an identity whose role
1279 /// doesn't satisfy the frame's coarse `Read` privilege gets
1280 /// denied with the frame's signal.
1281 ///
1282 /// We test the gate by classifying an INSERT (which the frame
1283 /// reports as `Privilege::Write`) under `Role::Read` — the only
1284 /// pair the legacy fallback would also reject, but here the
1285 /// rejection comes through `frame.check_query_privilege` BEFORE
1286 /// the parsed-expression walker runs. Removing
1287 /// `required_privilege` (or the `is_satisfied_by` consult inside
1288 /// `check_query_privilege`) would force the deny path back to the
1289 /// inline `RedDBRuntime::check_query_privilege` walker — but the
1290 /// auth_store gate up there is bypassed when no auth_store is
1291 /// wired (embedded test mode), so this test would FLIP from
1292 /// denied to permitted and break the assertion below.
1293 #[test]
1294 fn insert_under_read_role_denied_via_frame_privilege() {
1295 reset_thread_locals();
1296 set_current_auth_identity("alice".to_string(), Role::Read);
1297
1298 let rt = fresh_runtime();
1299 // Bypass parser by reaching into the frame directly: the
1300 // frame derives privilege from the SQL prefix without
1301 // needing an auth_store wired up. Driving end-to-end via
1302 // `execute_query` would also reject (no table `t`), but for
1303 // a different reason — we want to pin the privilege seam.
1304 let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1305 .expect("frame builds for INSERT");
1306 let f: &dyn ReadFrame = &frame;
1307 assert_eq!(
1308 f.required_privilege(),
1309 Privilege::Write,
1310 "INSERT classified as Write"
1311 );
1312 let id = f.identity().expect("identity captured");
1313 assert!(
1314 !f.required_privilege().is_satisfied_by(id.1),
1315 "Role::Read does not satisfy Privilege::Write — frame must deny"
1316 );
1317
1318 // End-to-end: the frame's `check_query_privilege` sees the
1319 // (Read role, Write privilege) mismatch and denies before
1320 // dispatch. We drive a synthetic `QueryExpr::Table` because
1321 // the SELECT/INSERT parser would happen to also fail, and we
1322 // want the failure to come from the privilege seam.
1323 use crate::storage::query::ast::{QueryExpr, TableQuery};
1324 let expr = QueryExpr::Table(TableQuery::new("t"));
1325 let err = frame
1326 .check_query_privilege(&rt, &expr)
1327 .expect_err("denied via frame's coarse privilege gate");
1328 let msg = format!("{err}");
1329 assert!(
1330 msg.contains("permission denied") && msg.contains("Write"),
1331 "expected frame-level Write deny, got: {msg}"
1332 );
1333
1334 reset_thread_locals();
1335 }
1336
1337 /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1338 /// wired into the SELECT path and produces observable results
1339 /// byte-identical to the per-request-allocation baseline.
1340 ///
1341 /// A table with more rows than the streaming high-water mark
1342 /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1343 /// path to assemble many chunks, each leasing/recycling the frame
1344 /// arena's single chunk buffer. Driving it through `execute_query`
1345 /// (which builds a `StatementExecutionFrame` and lends its arena)
1346 /// must return every inserted row, in order — exactly what the
1347 /// allocate-per-chunk path returned. A bug in the arena wiring
1348 /// (dropped rows, bled rows, mis-ordering) would surface here.
1349 #[test]
1350 fn large_select_through_frame_arena_returns_all_rows_in_order() {
1351 reset_thread_locals();
1352 let rt = fresh_runtime();
1353 rt.execute_query("CREATE TABLE big (id INT)")
1354 .expect("create table");
1355
1356 // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1357 // spans multiple chunks and the arena buffer is reused.
1358 const N: usize = 2_500;
1359 let values = (0..N)
1360 .map(|i| format!("({i})"))
1361 .collect::<Vec<_>>()
1362 .join(", ");
1363 rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1364 .expect("insert rows");
1365
1366 let result = rt
1367 .execute_query("SELECT id FROM big ORDER BY id")
1368 .expect("large SELECT executes through the frame arena path");
1369 assert_eq!(result.statement_type, "select");
1370 assert_eq!(
1371 result.result.records.len(),
1372 N,
1373 "every inserted row streams back through the arena-backed channel"
1374 );
1375 for (i, record) in result.result.records.iter().enumerate() {
1376 assert_eq!(
1377 record.get("id"),
1378 Some(&crate::storage::schema::Value::Integer(i as i64)),
1379 "row {i} is byte-identical to the per-request-allocation baseline"
1380 );
1381 }
1382
1383 reset_thread_locals();
1384 }
1385
1386 /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1387 /// control statement carries `LockIntent::None` and the
1388 /// `acquire_intent_locks` path returns `None` without consulting
1389 /// `intent_lock_modes_for`. Removing the method (or its consult
1390 /// site in `acquire_intent_locks`) would force the lock-mode
1391 /// helper to walk a fabricated parsed expression to reach the
1392 /// same conclusion — but the assertion that no guard is allocated
1393 /// for a `BEGIN` frame would still hold, so we additionally pin
1394 /// the classifier mapping above to make the deletion observable.
1395 #[test]
1396 fn control_statement_skips_intent_locks_via_frame() {
1397 reset_thread_locals();
1398 let rt = fresh_runtime();
1399
1400 let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1401 let f: &dyn ReadFrame = &frame;
1402 assert_eq!(f.lock_intent(), LockIntent::None);
1403
1404 // Drive `acquire_intent_locks` against a fabricated SELECT
1405 // expression that WOULD normally yield `(IS, IS)`; the frame's
1406 // `lock_intent() == None` short-circuit must still suppress
1407 // the guard.
1408 use crate::storage::query::ast::{QueryExpr, TableQuery};
1409 let expr = QueryExpr::Table(TableQuery::new("t"));
1410 let guard = frame.acquire_intent_locks(&rt, &expr);
1411 assert!(
1412 guard.is_none(),
1413 "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1414 );
1415 }
1416}