reddb_server/runtime/statement_frame.rs
1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7 collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8 has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9 query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10 SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30 /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31 /// any role from `Role::Read` upward.
32 Read,
33 /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34 /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35 /// at least `Role::Write`.
36 Write,
37 /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38 /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39 Admin,
40 /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41 /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42 /// for any authenticated identity.
43 None,
44}
45
46impl Privilege {
47 /// `true` iff `role` is sufficient to execute a statement carrying
48 /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49 /// Admin` containment used by the auth fallback path.
50 pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51 match self {
52 Self::None => true,
53 Self::Read => role.can_read(),
54 Self::Write => role.can_write(),
55 Self::Admin => role.can_admin(),
56 }
57 }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67 /// No collection-level lock needed (transaction control, SET,
68 /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69 None,
70 /// Reader-style intent: SELECT, joins, graph / queue / search
71 /// reads. Maps to `(IS, IS)` at the storage layer.
72 Shared,
73 /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74 /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75 /// `Exclusive` at this granularity — call sites that need the
76 /// finer distinction still consult `intent_lock_modes_for`.
77 Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93 /// Effective tenant scope for the statement after WITHIN /
94 /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95 /// "no tenant bound" (RLS deny-default applies).
96 fn effective_scope(&self) -> Option<&str>;
97
98 /// Authenticated identity observed at frame-build time, if any.
99 /// Returns `(username, role)` so callers can render audit lines
100 /// or feed RLS policy lookups without re-reading thread-locals.
101 fn identity(&self) -> Option<(&str, Role)>;
102
103 /// MVCC snapshot the statement reads against. For autocommit
104 /// this is a fresh snapshot; inside an active transaction it
105 /// is the txn's snapshot; under AS OF it is the resolved
106 /// historical xid.
107 fn snapshot(&self) -> &Snapshot;
108
109 /// AS OF xid floor when AS OF was applied for this statement,
110 /// `None` for live reads. Useful for downstream callers that
111 /// want to gate behaviour on historical-read mode without
112 /// re-parsing the query.
113 fn as_of_floor(&self) -> Option<Xid>;
114
115 /// Stable result-cache key for the statement (already mixes
116 /// effective tenant + identity).
117 fn cache_key(&self) -> &str;
118
119 /// Whether the statement is safe to serve from / populate the
120 /// result cache. Combines two underlying signals:
121 ///
122 /// * the query does not call a volatile builtin (e.g. `NOW()`,
123 /// `RANDOM()`, `UUID()`), which would change between calls,
124 /// * the connection is not inside an active transaction with
125 /// uncommitted writes that other readers shouldn't observe.
126 ///
127 /// SELECT cache callsites (read + write) consult this method
128 /// instead of re-deriving safety from globals or poking the
129 /// frame's private fields. Removing it would force every cache
130 /// callsite to re-run `query_has_volatile_builtin` plus
131 /// `result_cache_safe(conn_id)` inline.
132 fn should_cache_result(&self) -> bool;
133
134 /// Coarse privilege class the statement requires, computed once
135 /// at frame-build time from the SQL prefix. Read/write dispatch
136 /// sites consult this instead of re-classifying the parsed
137 /// `QueryExpr` inline at every callsite.
138 ///
139 /// Removing this method would force every privilege gate to
140 /// recompute the (action, resource) classification from the
141 /// parsed expression and re-check the role hierarchy inline.
142 fn required_privilege(&self) -> Privilege;
143
144 /// Coarse collection-level lock intent the statement implies.
145 /// `None` lets the lock-acquisition path short-circuit without
146 /// touching the lock manager.
147 ///
148 /// Removing this method would force the lock-acquisition path
149 /// to always invoke `intent_lock_modes_for` (which itself walks
150 /// the parsed expression) even for transaction-control / SET /
151 /// SHOW statements that need no collection lock at all.
152 fn lock_intent(&self) -> LockIntent;
153
154 /// Set of collection ids the calling identity is allowed to
155 /// observe under the active `(tenant, role)` scope. Computed once
156 /// at frame-build time via the `AuthStore` visible-collections
157 /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158 /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159 /// before any similarity score is computed (issue #119).
160 ///
161 /// `None` means the frame was built without an auth store wired —
162 /// embedded / single-tenant tests run that way. AI search call
163 /// sites refuse to proceed with `None`, which is the deny-default
164 /// the issue requires; pure SELECT paths fall back to the existing
165 /// per-row RLS gate.
166 fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175 let trimmed = query.trim_start();
176 // Skip a leading line / block comment so the classifier doesn't
177 // misread `/* ... */ SELECT ...` as an unknown statement.
178 let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179 rest.split_once('\n')
180 .map(|(_, r)| r)
181 .unwrap_or("")
182 .trim_start()
183 } else {
184 trimmed
185 };
186 let first = trimmed
187 .split(|c: char| c.is_whitespace() || c == '(' || c == ';')
188 .next()
189 .unwrap_or("");
190 // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
191 let mut buf = [0u8; 16];
192 let bytes = first.as_bytes();
193 let n = bytes.len().min(buf.len());
194 for i in 0..n {
195 buf[i] = bytes[i].to_ascii_uppercase();
196 }
197 match &buf[..n] {
198 b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" => "read",
199 b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
200 b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
201 b"GRANT" | b"REVOKE" => "admin",
202 b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
203 | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
204 _ => "unknown",
205 }
206}
207
208fn classify_privilege(query: &str) -> Privilege {
209 match statement_kind(query) {
210 "read" => Privilege::Read,
211 "write" => Privilege::Write,
212 // DDL is gated at `Role::Write` in the legacy fallback (see
213 // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
214 // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
215 // GRANT / REVOKE upgrade to Admin via finer checks at the call
216 // site — the frame surfaces only the coarse class.
217 "ddl" => Privilege::Write,
218 "admin" => Privilege::Admin,
219 _ => Privilege::None,
220 }
221}
222
223fn classify_lock_intent(query: &str) -> LockIntent {
224 match statement_kind(query) {
225 "read" => LockIntent::Shared,
226 "write" | "ddl" => LockIntent::Exclusive,
227 _ => LockIntent::None,
228 }
229}
230
231pub(super) struct StatementExecutionFrame {
232 tx_local_tenant: Option<Option<String>>,
233 snapshot: Snapshot,
234 own_xids: HashSet<Xid>,
235 cache_key: String,
236 is_volatile_query: bool,
237 cache_safe: bool,
238 /// Effective tenant captured at frame-build time after WITHIN /
239 /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
240 /// so the `ReadFrame` Interface can return a borrow without
241 /// re-touching the thread-local stack.
242 effective_scope: Option<String>,
243 /// Auth identity captured at frame-build time. `None` for
244 /// embedded / anonymous callers.
245 identity: Option<(String, Role)>,
246 /// `Some(xid)` when AS OF resolved to a historical xid; `None`
247 /// for live reads.
248 as_of_floor: Option<Xid>,
249 /// True when the statement snapshot can require tuple versions that
250 /// current secondary indexes no longer contain.
251 requires_index_fallback: bool,
252 /// Privilege class required by the statement, derived from the
253 /// SQL text at frame-build time. Read/write dispatch sites
254 /// consult this instead of re-classifying the parsed expression.
255 required_privilege: Privilege,
256 /// Collection-level lock intent the statement implies. The
257 /// lock-acquisition path short-circuits when this is `None`.
258 lock_intent: LockIntent,
259 /// Set of collection ids the active `(tenant, role)` scope is
260 /// allowed to observe. Computed at frame-build time via the
261 /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
262 /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
263 /// scoring (issue #119). `None` when no auth store is wired
264 /// (embedded test mode) — AI search refuses on `None`.
265 visible_collections: Option<HashSet<String>>,
266 /// Per-owner buffer arena for query-result row chunks (#885). Owned
267 /// by the frame because the frame already owns the query lifecycle;
268 /// lent to the row-streaming path (`execute_runtime_table_query_in`)
269 /// so chunk buffers are reused across the statement's chunk-fetches
270 /// instead of allocated fresh per chunk. Reclaimed when the frame
271 /// drops at statement end — no `thread_local!` scratch, which would
272 /// be unsound under tokio's work-stealing runtime.
273 row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
274}
275
276pub(super) struct StatementFrameGuards {
277 _tx_local_guard: TxLocalTenantGuard,
278 _config_snapshot_guard: ConfigSnapshotGuard,
279 _secret_store_guard: SecretStoreGuard,
280 _snapshot_guard: CurrentSnapshotGuard,
281}
282
283pub(super) struct PreparedStatement {
284 pub(super) expr: QueryExpr,
285 pub(super) mode: QueryMode,
286}
287
288impl StatementExecutionFrame {
289 pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
290 let conn_id = current_connection_id();
291 let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
292 let own_xids = runtime.own_transaction_xids(conn_id);
293 let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
294 let requires_index_fallback =
295 as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
296 let cache_key = result_cache_key(query);
297 let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
298 let cache_safe = runtime.result_cache_safe(conn_id);
299 // Capture identity + effective scope under the same
300 // thread-local view that the cache key was built from, so
301 // the Interface and the cache key agree on what "this
302 // statement" means.
303 let effective_scope = current_tenant();
304 let identity = current_auth_identity();
305
306 // Coarse classification of the statement, computed once from
307 // the SQL prefix so downstream callers don't re-derive it
308 // from the parsed `QueryExpr` at every privilege / lock site.
309 let required_privilege = classify_privilege(query);
310 let lock_intent = classify_lock_intent(query);
311
312 // Issue #119: resolve the visible-collections set for the
313 // active (tenant, role) scope. Only meaningful when an auth
314 // store is wired *and* an identity was captured — embedded
315 // anonymous callers fall back to `None`, and AI search call
316 // sites refuse on `None`.
317 let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
318 {
319 (Some(store), Some((principal, role))) => {
320 let collections = runtime.inner.db.store().list_collections();
321 Some(store.visible_collections_for_scope(
322 effective_scope.as_deref(),
323 *role,
324 principal,
325 &collections,
326 ))
327 }
328 _ => None,
329 };
330
331 Ok(Self {
332 tx_local_tenant,
333 snapshot,
334 own_xids,
335 cache_key,
336 is_volatile_query,
337 cache_safe,
338 effective_scope,
339 identity,
340 as_of_floor,
341 requires_index_fallback,
342 required_privilege,
343 lock_intent,
344 visible_collections,
345 row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
346 })
347 }
348
349 /// Lend the frame's per-owner row-buffer arena (#885) to the
350 /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
351 /// the owner and the arena is reclaimed when the frame drops at
352 /// statement end.
353 pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
354 Rc::clone(&self.row_arena)
355 }
356
357 pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
358 StatementFrameGuards {
359 _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
360 _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
361 _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
362 _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
363 snapshot: self.snapshot.clone(),
364 manager: Arc::clone(&runtime.inner.snapshot_manager),
365 own_xids: self.own_xids.clone(),
366 requires_index_fallback: self.requires_index_fallback,
367 }),
368 }
369 }
370
371 pub(super) fn cache_key(&self) -> &str {
372 &self.cache_key
373 }
374
375 pub(super) fn can_read_result_cache(&self) -> bool {
376 // Delegates to the `ReadFrame` Interface so the volatile +
377 // active-tx safety decision lives in exactly one place.
378 <Self as ReadFrame>::should_cache_result(self)
379 }
380
381 pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
382 // Cache-safety (volatile builtin, active-tx writes) comes from
383 // the Interface; the rest are write-side payload heuristics
384 // (statement shape, result size) that aren't part of the
385 // safety contract.
386 <Self as ReadFrame>::should_cache_result(self)
387 && result.statement_type == "select"
388 && result.engine != "vault"
389 // `QUEUE READ` is a stateful read: a delayed message
390 // (issue #722) becomes deliverable over time without a
391 // producer push to invalidate the cache, so a cached empty
392 // result would hide it. Skip caching entirely.
393 && result.statement != "queue_group_read"
394 && result.result.pre_serialized_json.is_none()
395 // Graph-analytics TVF output (issue #802) is deterministic and
396 // expensive to recompute, so it is cached at any row count. The
397 // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
398 && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
399 }
400
401 pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
402 if self.can_read_result_cache() {
403 runtime.get_result_cache_entry(self.cache_key())
404 } else {
405 None
406 }
407 }
408
409 pub(super) fn write_result_cache(
410 &self,
411 runtime: &RedDBRuntime,
412 result: &RuntimeQueryResult,
413 scopes: HashSet<String>,
414 ) {
415 if self.should_write_result_cache(result) {
416 runtime.put_result_cache_entry(
417 self.cache_key(),
418 RuntimeResultCacheEntry {
419 result: result.clone(),
420 cached_at: std::time::Instant::now(),
421 scopes,
422 },
423 );
424 }
425 }
426
427 pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
428 // Detected via cheap prefix check so non-CTE queries skip the
429 // full parse here. CTE-bearing queries bypass the plan cache
430 // and result cache (rare workload — perf optimization is a
431 // follow-up). Inlining substitutes every CTE reference with
432 // its body as a subquery in FROM, after which the existing
433 // subquery-in-FROM machinery handles execution. Recursive
434 // CTEs are rejected explicitly until fixpoint execution wires
435 // through the runtime.
436 if !has_with_prefix(query) {
437 return Ok(None);
438 }
439 let parsed = crate::storage::query::parser::parse(query)
440 .map_err(|err| RedDBError::Query(err.to_string()))?;
441 if parsed.with_clause.is_some() {
442 let rewritten = crate::storage::query::executors::inline_ctes(parsed)
443 .map_err(|err| RedDBError::Query(err.to_string()))?;
444 return Ok(Some(rewritten));
445 }
446 // No WITH after parse (the prefix matched something else like
447 // `WITHIN` that already routed elsewhere) — fall through to
448 // the normal path with the original query.
449 Ok(None)
450 }
451
452 pub(super) fn prepare_statement(
453 &self,
454 runtime: &RedDBRuntime,
455 query: &str,
456 ) -> RedDBResult<PreparedStatement> {
457 let mode = detect_mode(query);
458 if matches!(mode, QueryMode::Unknown) {
459 return Err(RedDBError::Query("unable to detect query mode".to_string()));
460 }
461
462 // ── Plan cache: reuse only exact-query ASTs ──
463 //
464 // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
465 // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
466 // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
467 // Plan cache applies to statements whose shape can be
468 // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
469 // reuses the same plan across thousands of varying literals).
470 // INSERT is still bypassed — its shape changes per column set
471 // and bulk paths don't go through here anyway.
472 let first_word = query
473 .trim()
474 .split_ascii_whitespace()
475 .next()
476 .unwrap_or("")
477 .to_ascii_uppercase();
478 let is_insert = first_word == "INSERT";
479
480 // Fused normalize+extract: one byte-scan produces both the
481 // cache_key AND the literal bindings. Saves a second Lexer
482 // pass over the query text on every cache hit — dominant
483 // cost on tight UPDATE loops that hit the same shape
484 // thousands of times with varying literals.
485 let (cache_key, prescan_binds) = if is_insert {
486 (String::new(), Vec::new())
487 } else {
488 crate::storage::query::planner::cache_key::normalize_and_extract(query)
489 };
490
491 let expr = if is_insert {
492 // Bypass plan cache for INSERT — shape varies per query.
493 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
494 } else {
495 // ── Hot path: read lock only (no writer serialization on cache hits) ──
496 //
497 // peek() is a non-mutating probe: no LRU promotion, no touch().
498 // This lets concurrent readers proceed without blocking each other.
499 // On hit we bind literals if needed and return immediately.
500 // Only on miss do we drop to a write lock to parse + insert.
501 let hit = {
502 let plan_cache = runtime.inner.query_cache.read();
503 plan_cache.peek(&cache_key).map(|cached| {
504 let parameter_count = cached.parameter_count;
505 let optimized = cached.plan.optimized.clone();
506 let exact_query = cached.exact_query.clone();
507 (parameter_count, optimized, exact_query)
508 })
509 };
510
511 if let Some((parameter_count, optimized, exact_query)) = hit {
512 if parameter_count > 0 {
513 // Shape hit: use the binds extracted during normalise.
514 let shape_binds = prescan_binds.clone();
515 if let Some(bound) =
516 crate::storage::query::planner::shape::bind_parameterized_query(
517 &optimized,
518 &shape_binds,
519 parameter_count,
520 )
521 {
522 bound
523 } else if exact_query.as_deref() == Some(query) {
524 // Bind failed but exact query matches — use as-is.
525 optimized
526 } else {
527 // Bind failed and literals differ: re-parse fresh.
528 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
529 }
530 } else {
531 // No parameters means either there truly are no literals,
532 // or this statement type does not participate in shape
533 // parameterization (for example graph/queue commands).
534 // Reusing a normalized-cache hit across a different exact
535 // query can therefore leak stale literals into execution.
536 if exact_query.as_deref() == Some(query) {
537 optimized
538 } else {
539 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
540 }
541 }
542 } else {
543 // Cache miss — parse, parameterize, store.
544 let parsed =
545 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
546 let (cached_expr, parameter_count) = if let Some(prepared) =
547 crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
548 {
549 (prepared.shape, prepared.parameter_count)
550 } else {
551 (parsed.clone(), 0)
552 };
553 {
554 let mut pc = runtime.inner.query_cache.write();
555 let plan = crate::storage::query::planner::QueryPlan::new(
556 parsed.clone(),
557 cached_expr,
558 Default::default(),
559 );
560 pc.insert(
561 cache_key.clone(),
562 crate::storage::query::planner::CachedPlan::new(plan)
563 .with_shape_key(cache_key.clone())
564 .with_exact_query(query.to_string())
565 .with_parameter_count(parameter_count),
566 );
567 }
568 parsed
569 }
570 };
571
572 // Phase 5 PG parity: substitute any registered view name that
573 // appears in the expression with its stored body. Runs after
574 // parse and before dispatch so the SQL entrypoint gets the
575 // same view resolution `execute_query_expr` already does.
576 let expr = runtime.rewrite_view_refs(expr);
577
578 Ok(PreparedStatement { expr, mode })
579 }
580
581 pub(super) fn check_query_privilege(
582 &self,
583 runtime: &RedDBRuntime,
584 expr: &QueryExpr,
585 ) -> RedDBResult<()> {
586 // Frame-level coarse gate. We consult `required_privilege()`
587 // (computed once at frame-build) against the captured identity
588 // before the deep grant engine walks the parsed expression.
589 // The coarse gate cannot ALLOW anything the grant engine would
590 // deny — it only short-circuits the obvious "Role::Read tries
591 // INSERT" case so a downstream caller never has to redo this
592 // check inline. `Privilege::None` (transaction control / SET /
593 // SHOW) flows through unchanged; the grant engine treats those
594 // as bypass too.
595 if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
596 let needed = <Self as ReadFrame>::required_privilege(self);
597 if !needed.is_satisfied_by(role) {
598 // Issue #205 — when the deep grant engine *also*
599 // denies, we treat this as an ordinary permission
600 // failure. But when an Admin-only statement reaches
601 // this gate without an auth_store wired (so the deep
602 // engine can't double-check), the coarse rejection is
603 // the only line of defence — emit an OperatorEvent so
604 // the operator notices an Admin-class statement was
605 // attempted with insufficient role.
606 if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
607 crate::telemetry::operator_event::OperatorEvent::AuthBypass {
608 principal: username.to_string(),
609 resource: format!("statement requiring {needed:?}"),
610 detail: format!(
611 "auth_store not wired; coarse gate is sole defence (role={role:?})"
612 ),
613 }
614 .emit_global();
615 }
616 return Err(RedDBError::Query(format!(
617 "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
618 )));
619 }
620 }
621 runtime
622 .check_query_privilege(expr)
623 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
624 }
625
626 pub(super) fn prepare_dispatch(
627 &self,
628 runtime: &RedDBRuntime,
629 expr: &QueryExpr,
630 ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
631 runtime.validate_model_operations_before_auth(expr)?;
632 self.check_query_privilege(runtime, expr)?;
633 Ok(self.acquire_intent_locks(runtime, expr))
634 }
635
636 pub(super) fn acquire_intent_locks(
637 &self,
638 runtime: &RedDBRuntime,
639 expr: &QueryExpr,
640 ) -> Option<crate::runtime::locking::LockerGuard> {
641 if !runtime.config_bool("concurrency.locking.enabled", true) {
642 return None;
643 }
644 // Frame-level short-circuit: if the statement carries no lock
645 // intent (transaction control, SET, SHOW), skip the lock
646 // manager entirely instead of letting `intent_lock_modes_for`
647 // walk the parsed expression to reach the same conclusion.
648 if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
649 return None;
650 }
651 intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
652 let mut guard =
653 crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
654 let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
655 for collection in collections_referenced(expr) {
656 let _ = guard.acquire(
657 crate::runtime::locking::Resource::Collection(collection),
658 coll_mode,
659 );
660 }
661 guard
662 })
663 }
664}
665
666impl ReadFrame for StatementExecutionFrame {
667 fn effective_scope(&self) -> Option<&str> {
668 self.effective_scope.as_deref()
669 }
670
671 fn identity(&self) -> Option<(&str, Role)> {
672 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
673 }
674
675 fn snapshot(&self) -> &Snapshot {
676 &self.snapshot
677 }
678
679 fn as_of_floor(&self) -> Option<Xid> {
680 self.as_of_floor
681 }
682
683 fn cache_key(&self) -> &str {
684 &self.cache_key
685 }
686
687 fn should_cache_result(&self) -> bool {
688 !self.is_volatile_query && self.cache_safe
689 }
690
691 fn required_privilege(&self) -> Privilege {
692 self.required_privilege
693 }
694
695 fn lock_intent(&self) -> LockIntent {
696 self.lock_intent
697 }
698
699 fn visible_collections(&self) -> Option<&HashSet<String>> {
700 self.visible_collections.as_ref()
701 }
702}
703
704/// Lightweight `ReadFrame` carrier used by AI command entry points
705/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
706///
707/// Issue #119 calls this struct `EffectiveScope`. It bundles the
708/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
709/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
710/// instead of re-reading thread-locals at every call site.
711///
712/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
713/// from the per-statement thread-locals (identical to how
714/// `StatementExecutionFrame::build` derives them) and resolves
715/// `visible_collections` via the `AuthStore` cache.
716pub struct EffectiveScope {
717 pub(crate) tenant: Option<String>,
718 pub(crate) identity: Option<(String, Role)>,
719 pub(crate) snapshot: Snapshot,
720 pub(crate) visible_collections: Option<HashSet<String>>,
721}
722
723impl EffectiveScope {
724 /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
725 /// to gate LLM-backed NER calls behind `ai:ner:read`.
726 ///
727 /// Placeholder for now: always returns `false`. The auth engine's
728 /// capability matrix is future work; until it lands, every routed
729 /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
730 /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
731 /// Documented in code so the wire-up is a one-line change once
732 /// the auth engine learns capabilities.
733 pub fn has_capability(&self, _capability: &str) -> bool {
734 false
735 }
736}
737
738impl ReadFrame for EffectiveScope {
739 fn effective_scope(&self) -> Option<&str> {
740 self.tenant.as_deref()
741 }
742 fn identity(&self) -> Option<(&str, Role)> {
743 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
744 }
745 fn snapshot(&self) -> &Snapshot {
746 &self.snapshot
747 }
748 fn as_of_floor(&self) -> Option<Xid> {
749 None
750 }
751 fn cache_key(&self) -> &str {
752 ""
753 }
754 fn should_cache_result(&self) -> bool {
755 false
756 }
757 fn required_privilege(&self) -> Privilege {
758 Privilege::Read
759 }
760 fn lock_intent(&self) -> LockIntent {
761 LockIntent::Shared
762 }
763 fn visible_collections(&self) -> Option<&HashSet<String>> {
764 self.visible_collections.as_ref()
765 }
766}
767
768impl RedDBRuntime {
769 /// Build the AI command `EffectiveScope` from the current
770 /// statement thread-locals + auth store.
771 ///
772 /// Returns `None` for embedded callers (no auth store, no
773 /// identity) — `AuthorizedSearch` treats `None` as deny-default.
774 pub(crate) fn ai_scope(&self) -> EffectiveScope {
775 let tenant = super::impl_core::current_tenant();
776 let identity = super::impl_core::current_auth_identity();
777 let snapshot = self.current_snapshot();
778 let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
779 (Some(store), Some((principal, role))) => {
780 let collections = self.inner.db.store().list_collections();
781 Some(store.visible_collections_for_scope(
782 tenant.as_deref(),
783 *role,
784 principal,
785 &collections,
786 ))
787 }
788 _ => None,
789 };
790 EffectiveScope {
791 tenant,
792 identity,
793 snapshot,
794 visible_collections,
795 }
796 }
797}
798
799/// Test fixtures for callers that need to drive `ReadFrame` without
800/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
801/// only leaks across module boundaries inside the crate.
802#[cfg(test)]
803pub(crate) mod test_support {
804 use super::{LockIntent, Privilege, ReadFrame};
805 use crate::auth::Role;
806 use crate::storage::transaction::snapshot::{Snapshot, Xid};
807 use std::collections::HashSet;
808
809 /// A `ReadFrame` impl with hand-set fields. Used by
810 /// `authorized_search` tests to assert the deny-default and
811 /// scope-trim behaviour without going through frame construction.
812 pub(crate) struct FakeReadFrame {
813 pub tenant: Option<String>,
814 pub identity: Option<(String, Role)>,
815 pub snapshot: Snapshot,
816 pub visible: Option<HashSet<String>>,
817 }
818
819 impl FakeReadFrame {
820 pub(crate) fn without_scope() -> Self {
821 Self {
822 tenant: None,
823 identity: None,
824 snapshot: Snapshot {
825 xid: 0,
826 in_progress: HashSet::new(),
827 },
828 visible: None,
829 }
830 }
831
832 pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
833 Self {
834 tenant: Some("acme".to_string()),
835 identity: Some(("alice".to_string(), Role::Read)),
836 snapshot: Snapshot {
837 xid: 0,
838 in_progress: HashSet::new(),
839 },
840 visible: Some(visible),
841 }
842 }
843 }
844
845 impl ReadFrame for FakeReadFrame {
846 fn effective_scope(&self) -> Option<&str> {
847 self.tenant.as_deref()
848 }
849 fn identity(&self) -> Option<(&str, Role)> {
850 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
851 }
852 fn snapshot(&self) -> &Snapshot {
853 &self.snapshot
854 }
855 fn as_of_floor(&self) -> Option<Xid> {
856 None
857 }
858 fn cache_key(&self) -> &str {
859 ""
860 }
861 fn should_cache_result(&self) -> bool {
862 false
863 }
864 fn required_privilege(&self) -> Privilege {
865 Privilege::Read
866 }
867 fn lock_intent(&self) -> LockIntent {
868 LockIntent::Shared
869 }
870 fn visible_collections(&self) -> Option<&HashSet<String>> {
871 self.visible.as_ref()
872 }
873 }
874}
875
876impl RedDBRuntime {
877 fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
878 let mut set = HashSet::new();
879 if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
880 set.insert(ctx.xid);
881 for (_, sub) in &ctx.savepoints {
882 set.insert(*sub);
883 }
884 for sub in &ctx.released_sub_xids {
885 set.insert(*sub);
886 }
887 }
888 set
889 }
890
891 /// Resolve the snapshot for the current statement, returning
892 /// the snapshot itself and (when AS OF is in effect) the
893 /// resolved xid floor. The floor is the same xid carried inside
894 /// `Snapshot.xid` for AS OF reads — exposing it separately lets
895 /// the `ReadFrame` Interface tell "live read" from "historical
896 /// read" without inferring from `in_progress.is_empty()`.
897 fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
898 match peek_top_level_as_of_with_table(query) {
899 Some((spec, Some(table))) => {
900 if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
901 return Err(RedDBError::InvalidConfig(format!(
902 "AS OF requires a versioned collection — \
903 `{table}` has not opted in. \
904 Call vcs.set_versioned(\"{table}\", true) first."
905 )));
906 }
907 let xid = self.vcs_resolve_as_of(spec)?;
908 Ok((
909 Snapshot {
910 xid,
911 in_progress: HashSet::new(),
912 },
913 Some(xid),
914 ))
915 }
916 Some((spec, None)) => {
917 let xid = self.vcs_resolve_as_of(spec)?;
918 Ok((
919 Snapshot {
920 xid,
921 in_progress: HashSet::new(),
922 },
923 Some(xid),
924 ))
925 }
926 None => Ok((self.current_snapshot(), None)),
927 }
928 }
929
930 fn result_cache_safe(&self, conn_id: u64) -> bool {
931 let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
932 let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
933 !has_active_xids && !in_own_tx
934 }
935}
936
937/// Whether a result's `engine` tag is one of the graph-analytics TVF
938/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
939/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
940/// algorithm output that is cached regardless of row count.
941fn is_graph_tvf_engine(engine: &str) -> bool {
942 matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
943}
944
945fn result_cache_key(query: &str) -> String {
946 let tenant = current_tenant().unwrap_or_default();
947 let auth = current_auth_identity()
948 .map(|(user, role)| format!("{}|{:?}", user, role))
949 .unwrap_or_default();
950 if tenant.is_empty() && auth.is_empty() {
951 query.to_string()
952 } else {
953 format!("{query}\u{001e}{tenant}\u{001e}{auth}")
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960 use crate::api::RedDBOptions;
961 use crate::runtime::impl_core::{
962 clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
963 set_current_tenant,
964 };
965 use crate::runtime::RedDBRuntime;
966
967 fn fresh_runtime() -> RedDBRuntime {
968 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
969 }
970
971 /// Ensure thread-local state from a prior test can't leak into
972 /// the next one — tests in the same binary share the thread.
973 fn reset_thread_locals() {
974 clear_current_tenant();
975 clear_current_auth_identity();
976 }
977
978 #[test]
979 fn autocommit_select_takes_live_snapshot() {
980 reset_thread_locals();
981 let rt = fresh_runtime();
982 let frame =
983 StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
984
985 // Live reads: no AS OF floor, snapshot bounded by the
986 // manager's `peek_next_xid` so committed tuples are visible.
987 let f: &dyn ReadFrame = &frame;
988 assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
989 assert!(
990 f.snapshot().xid >= 1,
991 "autocommit snapshot xid is bounded by peek_next_xid"
992 );
993 }
994
995 #[test]
996 fn frame_captures_identity_and_scope() {
997 reset_thread_locals();
998 set_current_tenant("acme".to_string());
999 set_current_auth_identity("alice".to_string(), Role::Write);
1000
1001 let rt = fresh_runtime();
1002 let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1003 let f: &dyn ReadFrame = &frame;
1004
1005 assert_eq!(f.effective_scope(), Some("acme"));
1006 let id = f.identity().expect("identity captured");
1007 assert_eq!(id.0, "alice");
1008 assert!(matches!(id.1, Role::Write));
1009
1010 // Cache key mixes scope + identity so two callers under
1011 // different tenants never share a cache slot.
1012 assert!(
1013 f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1014 "cache key folds in scope + identity, got {:?}",
1015 f.cache_key()
1016 );
1017
1018 reset_thread_locals();
1019 }
1020
1021 #[test]
1022 fn as_of_rejects_non_versioned_user_collection() {
1023 reset_thread_locals();
1024 let rt = fresh_runtime();
1025
1026 // `not_versioned` is a plain user collection — the frame
1027 // builder must reject AS OF until the caller opts in via
1028 // `vcs.set_versioned`.
1029 let err = match StatementExecutionFrame::build(
1030 &rt,
1031 "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1032 ) {
1033 Err(e) => e,
1034 Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1035 };
1036
1037 let msg = format!("{err}");
1038 assert!(
1039 msg.contains("AS OF requires a versioned collection"),
1040 "expected AS OF rejection, got: {msg}"
1041 );
1042 }
1043
1044 /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1045 ///
1046 /// Sets a tenant + identity via the public thread-local API the
1047 /// runtime uses for ambient scope, drives a real `SELECT` through
1048 /// `execute_query`, then inspects the result cache that the SELECT
1049 /// path populates via `frame.cache_key()`. The key only carries
1050 /// the tenant + identity *because* it was built through the frame —
1051 /// reverting the wiring to inline `current_tenant()` /
1052 /// `current_auth_identity()` reads would still pass this test, but
1053 /// dropping the frame entirely (so the SELECT path stopped touching
1054 /// `cache_key`) would break it.
1055 #[test]
1056 fn select_path_routes_through_frame_cache_key() {
1057 reset_thread_locals();
1058 set_current_tenant("acme".to_string());
1059 set_current_auth_identity("alice".to_string(), Role::Read);
1060
1061 let rt = fresh_runtime();
1062 let result = rt
1063 .execute_query("SELECT 1")
1064 .expect("SELECT 1 executes under tenant=acme/identity=alice");
1065 assert_eq!(result.statement_type, "select");
1066
1067 // The SELECT path (in `execute_query_expr`) builds a frame and
1068 // writes its result through `frame.cache_key()`. That key folds
1069 // tenant + identity in via `result_cache_key`, so finding "acme"
1070 // and "alice" inside any cached key proves the frame was the
1071 // seam used.
1072 let cache = rt.inner.result_cache.read();
1073 let any_keyed_with_scope = cache
1074 .0
1075 .keys()
1076 .any(|k| k.contains("acme") && k.contains("alice"));
1077 assert!(
1078 any_keyed_with_scope,
1079 "expected at least one result-cache key carrying tenant+identity, \
1080 got keys: {:?}",
1081 cache.0.keys().collect::<Vec<_>>()
1082 );
1083
1084 reset_thread_locals();
1085 }
1086
1087 /// A SELECT that calls a volatile builtin (here:
1088 /// `pg_advisory_unlock`, the volatile token the runtime currently
1089 /// recognises in `query_has_volatile_builtin`) must NOT populate
1090 /// the result cache. Any caller hitting the cache after this would
1091 /// see a stale answer for an inherently-volatile query, so the
1092 /// SELECT path gates writes through `frame.should_cache_result()`.
1093 ///
1094 /// Deletion test: removing `ReadFrame::should_cache_result`, or
1095 /// reverting the SELECT path to skip its safety gate, would let
1096 /// the result cache silently absorb this statement and break the
1097 /// assertion below.
1098 #[test]
1099 fn volatile_select_does_not_populate_result_cache() {
1100 reset_thread_locals();
1101 let rt = fresh_runtime();
1102
1103 // Frame-level invariant: the volatile-builtin signal collapses
1104 // `should_cache_result` to false even for an autocommit /
1105 // out-of-tx connection.
1106 let frame =
1107 StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1108 let f: &dyn ReadFrame = &frame;
1109 assert!(
1110 !f.should_cache_result(),
1111 "volatile builtin must disable result-cache safety"
1112 );
1113
1114 // End-to-end: drive the volatile SELECT through `execute_query`
1115 // and confirm no entry was stamped under its cache key. Other
1116 // entries from prior tests sharing the binary may exist, so we
1117 // assert specifically on this query's key.
1118 let _ = rt
1119 .execute_query("SELECT pg_advisory_unlock(1)")
1120 .expect("volatile SELECT executes");
1121 let cache = rt.inner.result_cache.read();
1122 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1123 assert!(
1124 !cache.0.contains_key(&key),
1125 "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1126 cache.0.keys().collect::<Vec<_>>()
1127 );
1128
1129 reset_thread_locals();
1130 }
1131
1132 #[test]
1133 fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1134 reset_thread_locals();
1135 let rt = fresh_runtime();
1136 rt.inner
1137 .db
1138 .store()
1139 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1140
1141 let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1142 assert_eq!(result.statement_type, "select");
1143
1144 let key = result_cache_key("SELECT 1");
1145 assert!(
1146 rt.inner
1147 .result_blob_cache
1148 .get("runtime.result_cache", &key)
1149 .is_some(),
1150 "blob backend should stamp the Blob Cache path"
1151 );
1152 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1153 assert!(
1154 !rt.inner.result_cache.read().0.contains_key(&key),
1155 "blob backend should not write the legacy map"
1156 );
1157 }
1158
1159 #[test]
1160 fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1161 reset_thread_locals();
1162 let rt = fresh_runtime();
1163 rt.inner
1164 .db
1165 .store()
1166 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1167
1168 let _ = rt
1169 .execute_query("SELECT pg_advisory_unlock(1)")
1170 .expect("volatile SELECT executes");
1171 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1172 assert!(
1173 rt.inner
1174 .result_blob_cache
1175 .get("runtime.result_cache", &key)
1176 .is_none(),
1177 "volatile SELECT must not populate blob result cache"
1178 );
1179 assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1180 }
1181
1182 #[test]
1183 fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1184 reset_thread_locals();
1185 let rt = fresh_runtime();
1186 rt.inner
1187 .db
1188 .store()
1189 .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1190
1191 let first = rt.execute_query("SELECT 1").expect("first SELECT");
1192 let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1193 assert_eq!(first.result.len(), second.result.len());
1194
1195 let key = result_cache_key("SELECT 1");
1196 assert!(rt.inner.result_cache.read().0.contains_key(&key));
1197 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1198 assert_eq!(rt.result_cache_shadow_divergences(), 0);
1199 assert_eq!(
1200 crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1201 "cache_shadow_divergence_total"
1202 );
1203 }
1204
1205 #[test]
1206 fn as_of_on_red_collection_records_floor() {
1207 reset_thread_locals();
1208 let rt = fresh_runtime();
1209
1210 // `red_*` collections always allow AS OF. The frame should
1211 // resolve to a concrete xid and surface it via the Interface.
1212 let frame =
1213 StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1214 .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1215
1216 let f: &dyn ReadFrame = &frame;
1217 assert_eq!(
1218 f.as_of_floor(),
1219 Some(1),
1220 "AS OF SNAPSHOT 1 records xid=1 as the floor"
1221 );
1222 assert_eq!(f.snapshot().xid, 1);
1223 assert!(
1224 f.snapshot().in_progress.is_empty(),
1225 "historical reads have no in-progress set"
1226 );
1227 }
1228
1229 /// The frame classifies common SQL prefixes into the coarse
1230 /// `Privilege` / `LockIntent` buckets at build time. This test
1231 /// pins the mapping so a regression that silently re-routes
1232 /// (e.g. INSERT classified as Read) surfaces here, not at a
1233 /// downstream privilege gate.
1234 #[test]
1235 fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1236 reset_thread_locals();
1237 let rt = fresh_runtime();
1238
1239 let cases = [
1240 ("SELECT 1", Privilege::Read, LockIntent::Shared),
1241 (
1242 "INSERT INTO t (id) VALUES (1)",
1243 Privilege::Write,
1244 LockIntent::Exclusive,
1245 ),
1246 (
1247 "UPDATE t SET x = 1 WHERE id = 1",
1248 Privilege::Write,
1249 LockIntent::Exclusive,
1250 ),
1251 (
1252 "DELETE FROM t WHERE id = 1",
1253 Privilege::Write,
1254 LockIntent::Exclusive,
1255 ),
1256 (
1257 "CREATE TABLE foo (id INT)",
1258 Privilege::Write,
1259 LockIntent::Exclusive,
1260 ),
1261 ("BEGIN", Privilege::None, LockIntent::None),
1262 ("COMMIT", Privilege::None, LockIntent::None),
1263 ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1264 ];
1265
1266 for (q, want_priv, want_lock) in cases {
1267 let frame = StatementExecutionFrame::build(&rt, q)
1268 .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1269 let f: &dyn ReadFrame = &frame;
1270 assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1271 assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1272 }
1273 }
1274
1275 /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1276 /// driven through `execute_query` under an identity whose role
1277 /// doesn't satisfy the frame's coarse `Read` privilege gets
1278 /// denied with the frame's signal.
1279 ///
1280 /// We test the gate by classifying an INSERT (which the frame
1281 /// reports as `Privilege::Write`) under `Role::Read` — the only
1282 /// pair the legacy fallback would also reject, but here the
1283 /// rejection comes through `frame.check_query_privilege` BEFORE
1284 /// the parsed-expression walker runs. Removing
1285 /// `required_privilege` (or the `is_satisfied_by` consult inside
1286 /// `check_query_privilege`) would force the deny path back to the
1287 /// inline `RedDBRuntime::check_query_privilege` walker — but the
1288 /// auth_store gate up there is bypassed when no auth_store is
1289 /// wired (embedded test mode), so this test would FLIP from
1290 /// denied to permitted and break the assertion below.
1291 #[test]
1292 fn insert_under_read_role_denied_via_frame_privilege() {
1293 reset_thread_locals();
1294 set_current_auth_identity("alice".to_string(), Role::Read);
1295
1296 let rt = fresh_runtime();
1297 // Bypass parser by reaching into the frame directly: the
1298 // frame derives privilege from the SQL prefix without
1299 // needing an auth_store wired up. Driving end-to-end via
1300 // `execute_query` would also reject (no table `t`), but for
1301 // a different reason — we want to pin the privilege seam.
1302 let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1303 .expect("frame builds for INSERT");
1304 let f: &dyn ReadFrame = &frame;
1305 assert_eq!(
1306 f.required_privilege(),
1307 Privilege::Write,
1308 "INSERT classified as Write"
1309 );
1310 let id = f.identity().expect("identity captured");
1311 assert!(
1312 !f.required_privilege().is_satisfied_by(id.1),
1313 "Role::Read does not satisfy Privilege::Write — frame must deny"
1314 );
1315
1316 // End-to-end: the frame's `check_query_privilege` sees the
1317 // (Read role, Write privilege) mismatch and denies before
1318 // dispatch. We drive a synthetic `QueryExpr::Table` because
1319 // the SELECT/INSERT parser would happen to also fail, and we
1320 // want the failure to come from the privilege seam.
1321 use crate::storage::query::ast::{QueryExpr, TableQuery};
1322 let expr = QueryExpr::Table(TableQuery::new("t"));
1323 let err = frame
1324 .check_query_privilege(&rt, &expr)
1325 .expect_err("denied via frame's coarse privilege gate");
1326 let msg = format!("{err}");
1327 assert!(
1328 msg.contains("permission denied") && msg.contains("Write"),
1329 "expected frame-level Write deny, got: {msg}"
1330 );
1331
1332 reset_thread_locals();
1333 }
1334
1335 /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1336 /// wired into the SELECT path and produces observable results
1337 /// byte-identical to the per-request-allocation baseline.
1338 ///
1339 /// A table with more rows than the streaming high-water mark
1340 /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1341 /// path to assemble many chunks, each leasing/recycling the frame
1342 /// arena's single chunk buffer. Driving it through `execute_query`
1343 /// (which builds a `StatementExecutionFrame` and lends its arena)
1344 /// must return every inserted row, in order — exactly what the
1345 /// allocate-per-chunk path returned. A bug in the arena wiring
1346 /// (dropped rows, bled rows, mis-ordering) would surface here.
1347 #[test]
1348 fn large_select_through_frame_arena_returns_all_rows_in_order() {
1349 reset_thread_locals();
1350 let rt = fresh_runtime();
1351 rt.execute_query("CREATE TABLE big (id INT)")
1352 .expect("create table");
1353
1354 // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1355 // spans multiple chunks and the arena buffer is reused.
1356 const N: usize = 2_500;
1357 let values = (0..N)
1358 .map(|i| format!("({i})"))
1359 .collect::<Vec<_>>()
1360 .join(", ");
1361 rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1362 .expect("insert rows");
1363
1364 let result = rt
1365 .execute_query("SELECT id FROM big ORDER BY id")
1366 .expect("large SELECT executes through the frame arena path");
1367 assert_eq!(result.statement_type, "select");
1368 assert_eq!(
1369 result.result.records.len(),
1370 N,
1371 "every inserted row streams back through the arena-backed channel"
1372 );
1373 for (i, record) in result.result.records.iter().enumerate() {
1374 assert_eq!(
1375 record.get("id"),
1376 Some(&crate::storage::schema::Value::Integer(i as i64)),
1377 "row {i} is byte-identical to the per-request-allocation baseline"
1378 );
1379 }
1380
1381 reset_thread_locals();
1382 }
1383
1384 /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1385 /// control statement carries `LockIntent::None` and the
1386 /// `acquire_intent_locks` path returns `None` without consulting
1387 /// `intent_lock_modes_for`. Removing the method (or its consult
1388 /// site in `acquire_intent_locks`) would force the lock-mode
1389 /// helper to walk a fabricated parsed expression to reach the
1390 /// same conclusion — but the assertion that no guard is allocated
1391 /// for a `BEGIN` frame would still hold, so we additionally pin
1392 /// the classifier mapping above to make the deletion observable.
1393 #[test]
1394 fn control_statement_skips_intent_locks_via_frame() {
1395 reset_thread_locals();
1396 let rt = fresh_runtime();
1397
1398 let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1399 let f: &dyn ReadFrame = &frame;
1400 assert_eq!(f.lock_intent(), LockIntent::None);
1401
1402 // Drive `acquire_intent_locks` against a fabricated SELECT
1403 // expression that WOULD normally yield `(IS, IS)`; the frame's
1404 // `lock_intent() == None` short-circuit must still suppress
1405 // the guard.
1406 use crate::storage::query::ast::{QueryExpr, TableQuery};
1407 let expr = QueryExpr::Table(TableQuery::new("t"));
1408 let guard = frame.acquire_intent_locks(&rt, &expr);
1409 assert!(
1410 guard.is_none(),
1411 "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1412 );
1413 }
1414}