reddb_server/runtime/statement_frame.rs
1use std::cell::RefCell;
2use std::collections::HashSet;
3use std::rc::Rc;
4use std::sync::Arc;
5
6use super::impl_core::{
7 collections_referenced, current_auth_identity, current_connection_id, current_tenant,
8 has_with_prefix, intent_lock_modes_for, peek_top_level_as_of_with_table,
9 query_has_volatile_builtin, query_is_ask_statement, ConfigSnapshotGuard, CurrentSnapshotGuard,
10 SecretStoreGuard, SnapshotContext, TxLocalTenantGuard,
11};
12use super::{RedDBRuntime, RuntimeQueryResult, RuntimeResultCacheEntry};
13use crate::api::{RedDBError, RedDBResult};
14use crate::auth::Role;
15use crate::storage::query::ast::QueryExpr;
16use crate::storage::query::modes::{detect_mode, parse_multi, QueryMode};
17use crate::storage::transaction::snapshot::{Snapshot, Xid};
18
19/// Coarse privilege classification for a statement, computed once at
20/// frame-build time from the SQL text. Mirrors the three-role auth
21/// model (`Role::Read < Role::Write < Role::Admin`) so the frame can
22/// answer "can this identity run this statement?" without re-walking
23/// the parsed `QueryExpr` at every call site.
24///
25/// `None` means the statement does not touch the privilege gate at
26/// all (transaction control, SET, SHOW). Such statements must remain
27/// runnable under any authenticated identity.
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub(crate) enum Privilege {
30 /// Read-only data access (SELECT, EXPLAIN, SHOW). Satisfied by
31 /// any role from `Role::Read` upward.
32 Read,
33 /// Mutation of user data or schema author DDL (INSERT, UPDATE,
34 /// DELETE, CREATE/ALTER/DROP TABLE, CREATE MIGRATION). Requires
35 /// at least `Role::Write`.
36 Write,
37 /// Authority statements — GRANT, REVOKE, ALTER USER, APPLY /
38 /// ROLLBACK MIGRATION, IAM policy mutation. Requires `Role::Admin`.
39 Admin,
40 /// Statement does not consult the privilege gate (BEGIN, COMMIT,
41 /// ROLLBACK, SET, SHOW with no data exposure). Always permitted
42 /// for any authenticated identity.
43 None,
44}
45
46impl Privilege {
47 /// `true` iff `role` is sufficient to execute a statement carrying
48 /// this required privilege. Encodes the standard `Read ⊆ Write ⊆
49 /// Admin` containment used by the auth fallback path.
50 pub(crate) fn is_satisfied_by(self, role: Role) -> bool {
51 match self {
52 Self::None => true,
53 Self::Read => role.can_read(),
54 Self::Write => role.can_write(),
55 Self::Admin => role.can_admin(),
56 }
57 }
58}
59
60/// Coarse lock intent for a statement, computed once at frame-build
61/// time. Maps onto the storage-layer's `LockMode` matrix downstream
62/// but stays decoupled here so the runtime can answer "does this
63/// statement need the lock manager at all?" without a `use storage::`
64/// at every call site.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub(crate) enum LockIntent {
67 /// No collection-level lock needed (transaction control, SET,
68 /// SHOW, EXPLAIN). The lock-acquisition path can short-circuit.
69 None,
70 /// Reader-style intent: SELECT, joins, graph / queue / search
71 /// reads. Maps to `(IS, IS)` at the storage layer.
72 Shared,
73 /// Writer- or DDL-style intent: INSERT/UPDATE/DELETE (`(IX, IX)`)
74 /// and CREATE/ALTER/DROP (`(IX, X)`). Both are surfaced as
75 /// `Exclusive` at this granularity — call sites that need the
76 /// finer distinction still consult `intent_lock_modes_for`.
77 Exclusive,
78}
79
80/// Small, stable Interface that *represents* a read statement's
81/// execution context. Every read caller that needs to know "under
82/// what scope / identity / snapshot am I running, and is there an
83/// AS OF floor in effect?" consults this trait — never the
84/// underlying thread-locals or runtime fields directly.
85///
86/// The deletion test: removing this trait would force the four
87/// concerns it exposes back into ad-hoc lookups at every read
88/// callsite (`current_tenant()`, `current_auth_identity()`,
89/// `capture_current_snapshot()`, AS OF re-parsing). The trait
90/// concentrates them in one place so future changes (per-statement
91/// logging, audit, scope policy) have a single seam to extend.
92pub(crate) trait ReadFrame {
93 /// Effective tenant scope for the statement after WITHIN /
94 /// SET LOCAL TENANT / SET TENANT resolution. `None` means
95 /// "no tenant bound" (RLS deny-default applies).
96 fn effective_scope(&self) -> Option<&str>;
97
98 /// Authenticated identity observed at frame-build time, if any.
99 /// Returns `(username, role)` so callers can render audit lines
100 /// or feed RLS policy lookups without re-reading thread-locals.
101 fn identity(&self) -> Option<(&str, Role)>;
102
103 /// MVCC snapshot the statement reads against. For autocommit
104 /// this is a fresh snapshot; inside an active transaction it
105 /// is the txn's snapshot; under AS OF it is the resolved
106 /// historical xid.
107 fn snapshot(&self) -> &Snapshot;
108
109 /// AS OF xid floor when AS OF was applied for this statement,
110 /// `None` for live reads. Useful for downstream callers that
111 /// want to gate behaviour on historical-read mode without
112 /// re-parsing the query.
113 fn as_of_floor(&self) -> Option<Xid>;
114
115 /// Stable result-cache key for the statement (already mixes
116 /// effective tenant + identity).
117 fn cache_key(&self) -> &str;
118
119 /// Whether the statement is safe to serve from / populate the
120 /// result cache. Combines two underlying signals:
121 ///
122 /// * the query does not call a volatile builtin (e.g. `NOW()`,
123 /// `RANDOM()`, `UUID()`), which would change between calls,
124 /// * the connection is not inside an active transaction with
125 /// uncommitted writes that other readers shouldn't observe.
126 ///
127 /// SELECT cache callsites (read + write) consult this method
128 /// instead of re-deriving safety from globals or poking the
129 /// frame's private fields. Removing it would force every cache
130 /// callsite to re-run `query_has_volatile_builtin` plus
131 /// `result_cache_safe(conn_id)` inline.
132 fn should_cache_result(&self) -> bool;
133
134 /// Coarse privilege class the statement requires, computed once
135 /// at frame-build time from the SQL prefix. Read/write dispatch
136 /// sites consult this instead of re-classifying the parsed
137 /// `QueryExpr` inline at every callsite.
138 ///
139 /// Removing this method would force every privilege gate to
140 /// recompute the (action, resource) classification from the
141 /// parsed expression and re-check the role hierarchy inline.
142 fn required_privilege(&self) -> Privilege;
143
144 /// Coarse collection-level lock intent the statement implies.
145 /// `None` lets the lock-acquisition path short-circuit without
146 /// touching the lock manager.
147 ///
148 /// Removing this method would force the lock-acquisition path
149 /// to always invoke `intent_lock_modes_for` (which itself walks
150 /// the parsed expression) even for transaction-control / SET /
151 /// SHOW statements that need no collection lock at all.
152 fn lock_intent(&self) -> LockIntent;
153
154 /// Set of collection ids the calling identity is allowed to
155 /// observe under the active `(tenant, role)` scope. Computed once
156 /// at frame-build time via the `AuthStore` visible-collections
157 /// cache (see `auth::scope_cache`) and used by `AuthorizedSearch`
158 /// to pre-filter SEARCH SIMILAR / SEARCH CONTEXT candidate sets
159 /// before any similarity score is computed (issue #119).
160 ///
161 /// `None` means the frame was built without an auth store wired —
162 /// embedded / single-tenant tests run that way. AI search call
163 /// sites refuse to proceed with `None`, which is the deny-default
164 /// the issue requires; pure SELECT paths fall back to the existing
165 /// per-row RLS gate.
166 fn visible_collections(&self) -> Option<&std::collections::HashSet<String>>;
167}
168
169/// Cheap first-word classification of a SQL statement, used at
170/// frame-build time to derive `Privilege` + `LockIntent` without
171/// re-parsing the query. Matches the keywords that the legacy
172/// inline checks in `RedDBRuntime::check_query_privilege` and
173/// `intent_lock_modes_for` already key on.
174fn statement_kind(query: &str) -> &'static str {
175 let trimmed = query.trim_start();
176 // Skip a leading line / block comment so the classifier doesn't
177 // misread `/* ... */ SELECT ...` as an unknown statement.
178 let trimmed = if let Some(rest) = trimmed.strip_prefix("--") {
179 rest.split_once('\n')
180 .map(|(_, r)| r)
181 .unwrap_or("")
182 .trim_start()
183 } else {
184 trimmed
185 };
186 let mut tokens = trimmed.split(|c: char| c.is_whitespace() || c == '(' || c == ';');
187 let first = tokens.next().unwrap_or("");
188 let second = tokens.next().unwrap_or("");
189 if first.eq_ignore_ascii_case("KV") {
190 if second.eq_ignore_ascii_case("GET")
191 || second.eq_ignore_ascii_case("LIST")
192 || second.eq_ignore_ascii_case("WATCH")
193 {
194 return "read";
195 }
196 return "write";
197 }
198 if first.eq_ignore_ascii_case("VAULT") {
199 if second.eq_ignore_ascii_case("LIST")
200 || second.eq_ignore_ascii_case("WATCH")
201 || second.eq_ignore_ascii_case("HISTORY")
202 {
203 return "read";
204 }
205 return "write";
206 }
207 // ASCII-uppercase compare without allocating: SQL keywords are ASCII.
208 let mut buf = [0u8; 16];
209 let bytes = first.as_bytes();
210 let n = bytes.len().min(buf.len());
211 for i in 0..n {
212 buf[i] = bytes[i].to_ascii_uppercase();
213 }
214 match &buf[..n] {
215 b"SELECT" | b"WITH" | b"SHOW" | b"EXPLAIN" | b"DESCRIBE" | b"DESC" | b"RANK"
216 | b"APPROX" | b"APPROXIMATE" | b"ZRANK" | b"ZRANGE" | b"LIST" | b"WATCH" | b"GET"
217 | b"HISTORY" => "read",
218 b"INSERT" | b"UPDATE" | b"DELETE" | b"UPSERT" | b"MERGE" | b"COPY" | b"TRUNCATE" => "write",
219 b"CREATE" | b"ALTER" | b"DROP" | b"REINDEX" | b"VACUUM" | b"ANALYZE" => "ddl",
220 b"GRANT" | b"REVOKE" => "admin",
221 b"BEGIN" | b"START" | b"COMMIT" | b"ROLLBACK" | b"SAVEPOINT" | b"RELEASE" | b"END"
222 | b"SET" | b"RESET" | b"PREPARE" | b"EXECUTE" | b"DEALLOCATE" | b"USE" => "control",
223 b"PUT" | b"INCR" | b"DECR" | b"ADD" | b"ROTATE" | b"PURGE" | b"UNSEAL" | b"INVALIDATE" => {
224 "write"
225 }
226 _ => "unknown",
227 }
228}
229
230fn classify_privilege(query: &str) -> Privilege {
231 match statement_kind(query) {
232 "read" => Privilege::Read,
233 "write" => Privilege::Write,
234 // DDL is gated at `Role::Write` in the legacy fallback (see
235 // `RedDBRuntime::check_query_privilege` for CreateTable et al.),
236 // so it classifies as Write here. APPLY / ROLLBACK MIGRATION and
237 // GRANT / REVOKE upgrade to Admin via finer checks at the call
238 // site — the frame surfaces only the coarse class.
239 "ddl" => Privilege::Write,
240 "admin" => Privilege::Admin,
241 _ => Privilege::None,
242 }
243}
244
245fn classify_lock_intent(query: &str) -> LockIntent {
246 match statement_kind(query) {
247 "read" => LockIntent::Shared,
248 "write" | "ddl" => LockIntent::Exclusive,
249 _ => LockIntent::None,
250 }
251}
252
253pub(super) struct StatementExecutionFrame {
254 tx_local_tenant: Option<Option<String>>,
255 snapshot: Snapshot,
256 own_xids: HashSet<Xid>,
257 cache_key: String,
258 is_volatile_query: bool,
259 cache_safe: bool,
260 /// Effective tenant captured at frame-build time after WITHIN /
261 /// SET LOCAL TENANT / SET TENANT resolution. Stored on the frame
262 /// so the `ReadFrame` Interface can return a borrow without
263 /// re-touching the thread-local stack.
264 effective_scope: Option<String>,
265 /// Auth identity captured at frame-build time. `None` for
266 /// embedded / anonymous callers.
267 identity: Option<(String, Role)>,
268 /// `Some(xid)` when AS OF resolved to a historical xid; `None`
269 /// for live reads.
270 as_of_floor: Option<Xid>,
271 /// True when the statement snapshot can require tuple versions that
272 /// current secondary indexes no longer contain.
273 requires_index_fallback: bool,
274 /// Privilege class required by the statement, derived from the
275 /// SQL text at frame-build time. Read/write dispatch sites
276 /// consult this instead of re-classifying the parsed expression.
277 required_privilege: Privilege,
278 /// Collection-level lock intent the statement implies. The
279 /// lock-acquisition path short-circuits when this is `None`.
280 lock_intent: LockIntent,
281 /// Set of collection ids the active `(tenant, role)` scope is
282 /// allowed to observe. Computed at frame-build time via the
283 /// `AuthStore` visibility cache and consumed by `AuthorizedSearch`
284 /// to gate SEARCH SIMILAR / SEARCH CONTEXT candidate sets before
285 /// scoring (issue #119). `None` when no auth store is wired
286 /// (embedded test mode) — AI search refuses on `None`.
287 visible_collections: Option<HashSet<String>>,
288 /// Per-owner buffer arena for query-result row chunks (#885). Owned
289 /// by the frame because the frame already owns the query lifecycle;
290 /// lent to the row-streaming path (`execute_runtime_table_query_in`)
291 /// so chunk buffers are reused across the statement's chunk-fetches
292 /// instead of allocated fresh per chunk. Reclaimed when the frame
293 /// drops at statement end — no `thread_local!` scratch, which would
294 /// be unsound under tokio's work-stealing runtime.
295 row_arena: Rc<RefCell<super::query_exec::RowBufferArena>>,
296}
297
298pub(super) struct StatementFrameGuards {
299 _tx_local_guard: TxLocalTenantGuard,
300 _config_snapshot_guard: ConfigSnapshotGuard,
301 _secret_store_guard: SecretStoreGuard,
302 _snapshot_guard: CurrentSnapshotGuard,
303}
304
305pub(super) struct PreparedStatement {
306 pub(super) expr: QueryExpr,
307 pub(super) mode: QueryMode,
308}
309
310impl StatementExecutionFrame {
311 pub(super) fn build(runtime: &RedDBRuntime, query: &str) -> RedDBResult<Self> {
312 let conn_id = current_connection_id();
313 let tx_local_tenant = runtime.inner.tx_local_tenants.read().get(&conn_id).cloned();
314 let own_xids = runtime.own_transaction_xids(conn_id);
315 let (snapshot, as_of_floor) = runtime.statement_snapshot(query)?;
316 let requires_index_fallback =
317 as_of_floor.is_some() || runtime.inner.tx_contexts.read().contains_key(&conn_id);
318 let cache_key = result_cache_key(query);
319 let is_volatile_query = query_has_volatile_builtin(query) || query_is_ask_statement(query);
320 let cache_safe = runtime.result_cache_safe(conn_id);
321 // Capture identity + effective scope under the same
322 // thread-local view that the cache key was built from, so
323 // the Interface and the cache key agree on what "this
324 // statement" means.
325 let effective_scope = current_tenant();
326 let identity = current_auth_identity();
327
328 // Coarse classification of the statement, computed once from
329 // the SQL prefix so downstream callers don't re-derive it
330 // from the parsed `QueryExpr` at every privilege / lock site.
331 let required_privilege = classify_privilege(query);
332 let lock_intent = classify_lock_intent(query);
333
334 // Issue #119: resolve the visible-collections set for the
335 // active (tenant, role) scope. Only meaningful when an auth
336 // store is wired *and* an identity was captured — embedded
337 // anonymous callers fall back to `None`, and AI search call
338 // sites refuse on `None`.
339 let visible_collections = match (runtime.inner.auth_store.read().clone(), identity.as_ref())
340 {
341 (Some(store), Some((principal, role))) => {
342 let collections = runtime.inner.db.store().list_collections();
343 Some(store.visible_collections_for_scope(
344 effective_scope.as_deref(),
345 *role,
346 principal,
347 &collections,
348 ))
349 }
350 _ => None,
351 };
352
353 Ok(Self {
354 tx_local_tenant,
355 snapshot,
356 own_xids,
357 cache_key,
358 is_volatile_query,
359 cache_safe,
360 effective_scope,
361 identity,
362 as_of_floor,
363 requires_index_fallback,
364 required_privilege,
365 lock_intent,
366 visible_collections,
367 row_arena: Rc::new(RefCell::new(super::query_exec::RowBufferArena::new())),
368 })
369 }
370
371 /// Lend the frame's per-owner row-buffer arena (#885) to the
372 /// row-streaming path. Returns a cloned `Rc` handle; the frame remains
373 /// the owner and the arena is reclaimed when the frame drops at
374 /// statement end.
375 pub(super) fn row_arena(&self) -> Rc<RefCell<super::query_exec::RowBufferArena>> {
376 Rc::clone(&self.row_arena)
377 }
378
379 pub(super) fn install(&self, runtime: &RedDBRuntime) -> StatementFrameGuards {
380 StatementFrameGuards {
381 _tx_local_guard: TxLocalTenantGuard::install(self.tx_local_tenant.clone()),
382 _config_snapshot_guard: ConfigSnapshotGuard::install(Arc::clone(&runtime.inner.db)),
383 _secret_store_guard: SecretStoreGuard::install(runtime.inner.auth_store.read().clone()),
384 _snapshot_guard: CurrentSnapshotGuard::install(SnapshotContext {
385 snapshot: self.snapshot.clone(),
386 manager: Arc::clone(&runtime.inner.snapshot_manager),
387 own_xids: self.own_xids.clone(),
388 requires_index_fallback: self.requires_index_fallback,
389 }),
390 }
391 }
392
393 pub(super) fn cache_key(&self) -> &str {
394 &self.cache_key
395 }
396
397 pub(super) fn can_read_result_cache(&self) -> bool {
398 // Delegates to the `ReadFrame` Interface so the volatile +
399 // active-tx safety decision lives in exactly one place.
400 <Self as ReadFrame>::should_cache_result(self)
401 }
402
403 pub(super) fn should_write_result_cache(&self, result: &RuntimeQueryResult) -> bool {
404 // Cache-safety (volatile builtin, active-tx writes) comes from
405 // the Interface; the rest are write-side payload heuristics
406 // (statement shape, result size) that aren't part of the
407 // safety contract.
408 <Self as ReadFrame>::should_cache_result(self)
409 && result.statement_type == "select"
410 && result.engine != "vault"
411 && result.engine != "runtime-rank"
412 // `QUEUE READ` is a stateful read: a delayed message
413 // (issue #722) becomes deliverable over time without a
414 // producer push to invalidate the cache, so a cached empty
415 // result would hide it. Skip caching entirely.
416 && result.statement != "queue_group_read"
417 && result.result.pre_serialized_json.is_none()
418 // Graph-analytics TVF output (issue #802) is deterministic and
419 // expensive to recompute, so it is cached at any row count. The
420 // ≤5-row heuristic only bounds payload size for ordinary SELECTs.
421 && (is_graph_tvf_engine(result.engine) || result.result.records.len() <= 5)
422 }
423
424 pub(super) fn read_result_cache(&self, runtime: &RedDBRuntime) -> Option<RuntimeQueryResult> {
425 if self.can_read_result_cache() {
426 runtime.get_result_cache_entry(self.cache_key())
427 } else {
428 None
429 }
430 }
431
432 pub(super) fn write_result_cache(
433 &self,
434 runtime: &RedDBRuntime,
435 result: &RuntimeQueryResult,
436 scopes: HashSet<String>,
437 ) {
438 if self.should_write_result_cache(result) {
439 runtime.put_result_cache_entry(
440 self.cache_key(),
441 RuntimeResultCacheEntry {
442 result: result.clone(),
443 cached_at: std::time::Instant::now(),
444 scopes,
445 },
446 );
447 }
448 }
449
450 pub(super) fn prepare_cte(&self, query: &str) -> RedDBResult<Option<QueryExpr>> {
451 // Detected via cheap prefix check so non-CTE queries skip the
452 // full parse here. CTE-bearing queries bypass the plan cache
453 // and result cache (rare workload — perf optimization is a
454 // follow-up). Inlining substitutes every CTE reference with
455 // its body as a subquery in FROM, after which the existing
456 // subquery-in-FROM machinery handles execution. Recursive
457 // CTEs are rejected explicitly until fixpoint execution wires
458 // through the runtime.
459 if !has_with_prefix(query) {
460 return Ok(None);
461 }
462 let parsed = crate::storage::query::parser::parse(query)
463 .map_err(|err| RedDBError::Query(err.to_string()))?;
464 if parsed.with_clause.is_some() {
465 let rewritten = crate::storage::query::executors::inline_ctes(parsed)
466 .map_err(|err| RedDBError::Query(err.to_string()))?;
467 return Ok(Some(rewritten));
468 }
469 // No WITH after parse (the prefix matched something else like
470 // `WITHIN` that already routed elsewhere) — fall through to
471 // the normal path with the original query.
472 Ok(None)
473 }
474
475 pub(super) fn prepare_statement(
476 &self,
477 runtime: &RedDBRuntime,
478 query: &str,
479 ) -> RedDBResult<PreparedStatement> {
480 let mode = detect_mode(query);
481 if matches!(mode, QueryMode::Unknown) {
482 return Err(RedDBError::Query("unable to detect query mode".to_string()));
483 }
484
485 // ── Plan cache: reuse only exact-query ASTs ──
486 //
487 // DML statements (INSERT/UPDATE/DELETE) almost always have unique literal
488 // values, so caching them burns CPU on eviction bookkeeping (Vec::remove(0)
489 // shifts the entire LRU list) with zero hit rate. Skip the cache entirely
490 // Plan cache applies to statements whose shape can be
491 // normalised + rebound (`UPDATE t SET x=? WHERE _entity_id=?`
492 // reuses the same plan across thousands of varying literals).
493 // INSERT is still bypassed — its shape changes per column set
494 // and bulk paths don't go through here anyway.
495 let first_word = query
496 .trim()
497 .split_ascii_whitespace()
498 .next()
499 .unwrap_or("")
500 .to_ascii_uppercase();
501 let is_insert = first_word == "INSERT";
502
503 // Fused normalize+extract: one byte-scan produces both the
504 // cache_key AND the literal bindings. Saves a second Lexer
505 // pass over the query text on every cache hit — dominant
506 // cost on tight UPDATE loops that hit the same shape
507 // thousands of times with varying literals.
508 let (cache_key, prescan_binds) = if is_insert {
509 (String::new(), Vec::new())
510 } else {
511 crate::storage::query::planner::cache_key::normalize_and_extract(query)
512 };
513
514 let expr = if is_insert {
515 // Bypass plan cache for INSERT — shape varies per query.
516 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
517 } else {
518 // ── Hot path: read lock only (no writer serialization on cache hits) ──
519 //
520 // peek() is a non-mutating probe: no LRU promotion, no touch().
521 // This lets concurrent readers proceed without blocking each other.
522 // On hit we bind literals if needed and return immediately.
523 // Only on miss do we drop to a write lock to parse + insert.
524 let hit = {
525 let plan_cache = runtime.inner.query_cache.read();
526 plan_cache.peek(&cache_key).map(|cached| {
527 let parameter_count = cached.parameter_count;
528 let optimized = cached.plan.optimized.clone();
529 let exact_query = cached.exact_query.clone();
530 (parameter_count, optimized, exact_query)
531 })
532 };
533
534 if let Some((parameter_count, optimized, exact_query)) = hit {
535 if parameter_count > 0 {
536 // Shape hit: use the binds extracted during normalise.
537 let shape_binds = prescan_binds.clone();
538 if let Some(bound) =
539 crate::storage::query::planner::shape::bind_parameterized_query(
540 &optimized,
541 &shape_binds,
542 parameter_count,
543 )
544 {
545 bound
546 } else if exact_query.as_deref() == Some(query) {
547 // Bind failed but exact query matches — use as-is.
548 optimized
549 } else {
550 // Bind failed and literals differ: re-parse fresh.
551 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
552 }
553 } else {
554 // No parameters means either there truly are no literals,
555 // or this statement type does not participate in shape
556 // parameterization (for example graph/queue commands).
557 // Reusing a normalized-cache hit across a different exact
558 // query can therefore leak stale literals into execution.
559 if exact_query.as_deref() == Some(query) {
560 optimized
561 } else {
562 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?
563 }
564 }
565 } else {
566 // Cache miss — parse, parameterize, store.
567 let parsed =
568 parse_multi(query).map_err(|err| RedDBError::Query(err.to_string()))?;
569 let (cached_expr, parameter_count) = if let Some(prepared) =
570 crate::storage::query::planner::shape::parameterize_query_expr(&parsed)
571 {
572 (prepared.shape, prepared.parameter_count)
573 } else {
574 (parsed.clone(), 0)
575 };
576 {
577 let mut pc = runtime.inner.query_cache.write();
578 let plan = crate::storage::query::planner::QueryPlan::new(
579 parsed.clone(),
580 cached_expr,
581 Default::default(),
582 );
583 pc.insert(
584 cache_key.clone(),
585 crate::storage::query::planner::CachedPlan::new(plan)
586 .with_shape_key(cache_key.clone())
587 .with_exact_query(query.to_string())
588 .with_parameter_count(parameter_count),
589 );
590 }
591 parsed
592 }
593 };
594
595 // Phase 5 PG parity: substitute any registered view name that
596 // appears in the expression with its stored body. Runs after
597 // parse and before dispatch so the SQL entrypoint gets the
598 // same view resolution `execute_query_expr` already does.
599 let expr = runtime.rewrite_view_refs(expr);
600
601 Ok(PreparedStatement { expr, mode })
602 }
603
604 pub(super) fn check_query_privilege(
605 &self,
606 runtime: &RedDBRuntime,
607 expr: &QueryExpr,
608 ) -> RedDBResult<()> {
609 // Frame-level coarse gate. We consult `required_privilege()`
610 // (computed once at frame-build) against the captured identity
611 // before the deep grant engine walks the parsed expression.
612 // The coarse gate cannot ALLOW anything the grant engine would
613 // deny — it only short-circuits the obvious "Role::Read tries
614 // INSERT" case so a downstream caller never has to redo this
615 // check inline. `Privilege::None` (transaction control / SET /
616 // SHOW) flows through unchanged; the grant engine treats those
617 // as bypass too.
618 if let Some((username, role)) = <Self as ReadFrame>::identity(self) {
619 let needed = <Self as ReadFrame>::required_privilege(self);
620 if !needed.is_satisfied_by(role) {
621 // Issue #205 — when the deep grant engine *also*
622 // denies, we treat this as an ordinary permission
623 // failure. But when an Admin-only statement reaches
624 // this gate without an auth_store wired (so the deep
625 // engine can't double-check), the coarse rejection is
626 // the only line of defence — emit an OperatorEvent so
627 // the operator notices an Admin-class statement was
628 // attempted with insufficient role.
629 if matches!(needed, Privilege::Admin) && runtime.inner.auth_store.read().is_none() {
630 crate::telemetry::operator_event::OperatorEvent::AuthBypass {
631 principal: username.to_string(),
632 resource: format!("statement requiring {needed:?}"),
633 detail: format!(
634 "auth_store not wired; coarse gate is sole defence (role={role:?})"
635 ),
636 }
637 .emit_global();
638 }
639 return Err(RedDBError::Query(format!(
640 "permission denied: principal=`{username}` role=`{role:?}` lacks {needed:?} privilege"
641 )));
642 }
643 }
644 runtime
645 .check_query_privilege(expr)
646 .map_err(|err| RedDBError::Query(format!("permission denied: {err}")))
647 }
648
649 pub(super) fn prepare_dispatch(
650 &self,
651 runtime: &RedDBRuntime,
652 expr: &QueryExpr,
653 ) -> RedDBResult<Option<crate::runtime::locking::LockerGuard>> {
654 runtime.validate_model_operations_before_auth(expr)?;
655 self.check_query_privilege(runtime, expr)?;
656 Ok(self.acquire_intent_locks(runtime, expr))
657 }
658
659 pub(super) fn acquire_intent_locks(
660 &self,
661 runtime: &RedDBRuntime,
662 expr: &QueryExpr,
663 ) -> Option<crate::runtime::locking::LockerGuard> {
664 if !runtime.config_bool("concurrency.locking.enabled", true) {
665 return None;
666 }
667 // Frame-level short-circuit: if the statement carries no lock
668 // intent (transaction control, SET, SHOW), skip the lock
669 // manager entirely instead of letting `intent_lock_modes_for`
670 // walk the parsed expression to reach the same conclusion.
671 if matches!(<Self as ReadFrame>::lock_intent(self), LockIntent::None) {
672 return None;
673 }
674 intent_lock_modes_for(expr).map(|(global_mode, coll_mode)| {
675 let mut guard =
676 crate::runtime::locking::LockerGuard::new(runtime.inner.lock_manager.clone());
677 let _ = guard.acquire(crate::runtime::locking::Resource::Global, global_mode);
678 for collection in collections_referenced(expr) {
679 let _ = guard.acquire(
680 crate::runtime::locking::Resource::Collection(collection),
681 coll_mode,
682 );
683 }
684 guard
685 })
686 }
687}
688
689impl ReadFrame for StatementExecutionFrame {
690 fn effective_scope(&self) -> Option<&str> {
691 self.effective_scope.as_deref()
692 }
693
694 fn identity(&self) -> Option<(&str, Role)> {
695 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
696 }
697
698 fn snapshot(&self) -> &Snapshot {
699 &self.snapshot
700 }
701
702 fn as_of_floor(&self) -> Option<Xid> {
703 self.as_of_floor
704 }
705
706 fn cache_key(&self) -> &str {
707 &self.cache_key
708 }
709
710 fn should_cache_result(&self) -> bool {
711 !self.is_volatile_query && self.cache_safe
712 }
713
714 fn required_privilege(&self) -> Privilege {
715 self.required_privilege
716 }
717
718 fn lock_intent(&self) -> LockIntent {
719 self.lock_intent
720 }
721
722 fn visible_collections(&self) -> Option<&HashSet<String>> {
723 self.visible_collections.as_ref()
724 }
725}
726
727/// Lightweight `ReadFrame` carrier used by AI command entry points
728/// (`SEARCH SIMILAR`, `SEARCH CONTEXT`, `ASK`).
729///
730/// Issue #119 calls this struct `EffectiveScope`. It bundles the
731/// `(tenant, identity, role, visible_collections, snapshot)` tuple so
732/// every AI runtime entry can pass *one* value to `AuthorizedSearch`
733/// instead of re-reading thread-locals at every call site.
734///
735/// Built via `RedDBRuntime::ai_scope()` which sources tenant + identity
736/// from the per-statement thread-locals (identical to how
737/// `StatementExecutionFrame::build` derives them) and resolves
738/// `visible_collections` via the `AuthStore` cache.
739pub struct EffectiveScope {
740 pub(crate) tenant: Option<String>,
741 pub(crate) identity: Option<(String, Role)>,
742 pub(crate) snapshot: Snapshot,
743 pub(crate) visible_collections: Option<HashSet<String>>,
744}
745
746impl EffectiveScope {
747 /// Capability check used by the AI runtime (`runtime/ai/ner.rs`)
748 /// to gate LLM-backed NER calls behind `ai:ner:read`.
749 ///
750 /// Placeholder for now: always returns `false`. The auth engine's
751 /// capability matrix is future work; until it lands, every routed
752 /// LLM-NER call denies at the gate and `extract_tokens_routed`'s
753 /// heuristic fallback fires (see `ask_pipeline::extract_tokens_routed`).
754 /// Documented in code so the wire-up is a one-line change once
755 /// the auth engine learns capabilities.
756 pub fn has_capability(&self, _capability: &str) -> bool {
757 false
758 }
759}
760
761impl ReadFrame for EffectiveScope {
762 fn effective_scope(&self) -> Option<&str> {
763 self.tenant.as_deref()
764 }
765 fn identity(&self) -> Option<(&str, Role)> {
766 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
767 }
768 fn snapshot(&self) -> &Snapshot {
769 &self.snapshot
770 }
771 fn as_of_floor(&self) -> Option<Xid> {
772 None
773 }
774 fn cache_key(&self) -> &str {
775 ""
776 }
777 fn should_cache_result(&self) -> bool {
778 false
779 }
780 fn required_privilege(&self) -> Privilege {
781 Privilege::Read
782 }
783 fn lock_intent(&self) -> LockIntent {
784 LockIntent::Shared
785 }
786 fn visible_collections(&self) -> Option<&HashSet<String>> {
787 self.visible_collections.as_ref()
788 }
789}
790
791impl RedDBRuntime {
792 /// Build the AI command `EffectiveScope` from the current
793 /// statement thread-locals + auth store.
794 ///
795 /// Returns `None` for embedded callers (no auth store, no
796 /// identity) — `AuthorizedSearch` treats `None` as deny-default.
797 pub(crate) fn ai_scope(&self) -> EffectiveScope {
798 let tenant = super::impl_core::current_tenant();
799 let identity = super::impl_core::current_auth_identity();
800 let snapshot = self.current_snapshot();
801 let visible_collections = match (self.inner.auth_store.read().clone(), identity.as_ref()) {
802 (Some(store), Some((principal, role))) => {
803 let collections = self.inner.db.store().list_collections();
804 Some(store.visible_collections_for_scope(
805 tenant.as_deref(),
806 *role,
807 principal,
808 &collections,
809 ))
810 }
811 _ => None,
812 };
813 EffectiveScope {
814 tenant,
815 identity,
816 snapshot,
817 visible_collections,
818 }
819 }
820}
821
822/// Test fixtures for callers that need to drive `ReadFrame` without
823/// booting a runtime. Lives behind `cfg(test)` and `pub(crate)` so it
824/// only leaks across module boundaries inside the crate.
825#[cfg(test)]
826pub(crate) mod test_support {
827 use super::{LockIntent, Privilege, ReadFrame};
828 use crate::auth::Role;
829 use crate::storage::transaction::snapshot::{Snapshot, Xid};
830 use std::collections::HashSet;
831
832 /// A `ReadFrame` impl with hand-set fields. Used by
833 /// `authorized_search` tests to assert the deny-default and
834 /// scope-trim behaviour without going through frame construction.
835 pub(crate) struct FakeReadFrame {
836 pub tenant: Option<String>,
837 pub identity: Option<(String, Role)>,
838 pub snapshot: Snapshot,
839 pub visible: Option<HashSet<String>>,
840 }
841
842 impl FakeReadFrame {
843 pub(crate) fn without_scope() -> Self {
844 Self {
845 tenant: None,
846 identity: None,
847 snapshot: Snapshot {
848 xid: 0,
849 in_progress: HashSet::new(),
850 },
851 visible: None,
852 }
853 }
854
855 pub(crate) fn with_visible(visible: HashSet<String>) -> Self {
856 Self {
857 tenant: Some("acme".to_string()),
858 identity: Some(("alice".to_string(), Role::Read)),
859 snapshot: Snapshot {
860 xid: 0,
861 in_progress: HashSet::new(),
862 },
863 visible: Some(visible),
864 }
865 }
866 }
867
868 impl ReadFrame for FakeReadFrame {
869 fn effective_scope(&self) -> Option<&str> {
870 self.tenant.as_deref()
871 }
872 fn identity(&self) -> Option<(&str, Role)> {
873 self.identity.as_ref().map(|(u, r)| (u.as_str(), *r))
874 }
875 fn snapshot(&self) -> &Snapshot {
876 &self.snapshot
877 }
878 fn as_of_floor(&self) -> Option<Xid> {
879 None
880 }
881 fn cache_key(&self) -> &str {
882 ""
883 }
884 fn should_cache_result(&self) -> bool {
885 false
886 }
887 fn required_privilege(&self) -> Privilege {
888 Privilege::Read
889 }
890 fn lock_intent(&self) -> LockIntent {
891 LockIntent::Shared
892 }
893 fn visible_collections(&self) -> Option<&HashSet<String>> {
894 self.visible.as_ref()
895 }
896 }
897}
898
899impl RedDBRuntime {
900 fn own_transaction_xids(&self, conn_id: u64) -> HashSet<Xid> {
901 let mut set = HashSet::new();
902 if let Some(ctx) = self.inner.tx_contexts.read().get(&conn_id) {
903 set.insert(ctx.xid);
904 for (_, sub) in &ctx.savepoints {
905 set.insert(*sub);
906 }
907 for sub in &ctx.released_sub_xids {
908 set.insert(*sub);
909 }
910 }
911 set
912 }
913
914 /// Resolve the snapshot for the current statement, returning
915 /// the snapshot itself and (when AS OF is in effect) the
916 /// resolved xid floor. The floor is the same xid carried inside
917 /// `Snapshot.xid` for AS OF reads — exposing it separately lets
918 /// the `ReadFrame` Interface tell "live read" from "historical
919 /// read" without inferring from `in_progress.is_empty()`.
920 fn statement_snapshot(&self, query: &str) -> RedDBResult<(Snapshot, Option<Xid>)> {
921 match peek_top_level_as_of_with_table(query) {
922 Some((spec, Some(table))) => {
923 if !table.starts_with("red_") && !self.vcs_is_versioned(&table)? {
924 return Err(RedDBError::InvalidConfig(format!(
925 "AS OF requires a versioned collection — \
926 `{table}` has not opted in. \
927 Call vcs.set_versioned(\"{table}\", true) first."
928 )));
929 }
930 let xid = self.vcs_resolve_as_of(spec)?;
931 Ok((
932 Snapshot {
933 xid,
934 in_progress: HashSet::new(),
935 },
936 Some(xid),
937 ))
938 }
939 Some((spec, None)) => {
940 let xid = self.vcs_resolve_as_of(spec)?;
941 Ok((
942 Snapshot {
943 xid,
944 in_progress: HashSet::new(),
945 },
946 Some(xid),
947 ))
948 }
949 None => Ok((self.current_snapshot(), None)),
950 }
951 }
952
953 fn result_cache_safe(&self, conn_id: u64) -> bool {
954 let has_active_xids = self.inner.snapshot_manager.oldest_active_xid().is_some();
955 let in_own_tx = self.inner.tx_contexts.read().contains_key(&conn_id);
956 !has_active_xids && !in_own_tx
957 }
958}
959
960/// Whether a result's `engine` tag is one of the graph-analytics TVF
961/// executors (issue #802). Graph-collection (`louvain(g)`) and inline
962/// (`louvain(nodes => …, edges => …)`) forms both produce deterministic
963/// algorithm output that is cached regardless of row count.
964fn is_graph_tvf_engine(engine: &str) -> bool {
965 matches!(engine, "runtime-graph-tvf" | "runtime-graph-tvf-inline")
966}
967
968fn result_cache_key(query: &str) -> String {
969 let tenant = current_tenant().unwrap_or_default();
970 let auth = current_auth_identity()
971 .map(|(user, role)| format!("{}|{:?}", user, role))
972 .unwrap_or_default();
973 if tenant.is_empty() && auth.is_empty() {
974 query.to_string()
975 } else {
976 format!("{query}\u{001e}{tenant}\u{001e}{auth}")
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use crate::api::RedDBOptions;
984 use crate::runtime::impl_core::{
985 clear_current_auth_identity, clear_current_tenant, set_current_auth_identity,
986 set_current_tenant,
987 };
988 use crate::runtime::RedDBRuntime;
989
990 fn fresh_runtime() -> RedDBRuntime {
991 RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("in-memory runtime")
992 }
993
994 /// Ensure thread-local state from a prior test can't leak into
995 /// the next one — tests in the same binary share the thread.
996 fn reset_thread_locals() {
997 clear_current_tenant();
998 clear_current_auth_identity();
999 }
1000
1001 #[test]
1002 fn autocommit_select_takes_live_snapshot() {
1003 reset_thread_locals();
1004 let rt = fresh_runtime();
1005 let frame =
1006 StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds for SELECT 1");
1007
1008 // Live reads: no AS OF floor, snapshot bounded by the
1009 // manager's `peek_next_xid` so committed tuples are visible.
1010 let f: &dyn ReadFrame = &frame;
1011 assert!(f.as_of_floor().is_none(), "live read has no AS OF floor");
1012 assert!(
1013 f.snapshot().xid >= 1,
1014 "autocommit snapshot xid is bounded by peek_next_xid"
1015 );
1016 }
1017
1018 #[test]
1019 fn frame_captures_identity_and_scope() {
1020 reset_thread_locals();
1021 set_current_tenant("acme".to_string());
1022 set_current_auth_identity("alice".to_string(), Role::Write);
1023
1024 let rt = fresh_runtime();
1025 let frame = StatementExecutionFrame::build(&rt, "SELECT 1").expect("frame builds");
1026 let f: &dyn ReadFrame = &frame;
1027
1028 assert_eq!(f.effective_scope(), Some("acme"));
1029 let id = f.identity().expect("identity captured");
1030 assert_eq!(id.0, "alice");
1031 assert!(matches!(id.1, Role::Write));
1032
1033 // Cache key mixes scope + identity so two callers under
1034 // different tenants never share a cache slot.
1035 assert!(
1036 f.cache_key().contains("acme") && f.cache_key().contains("alice"),
1037 "cache key folds in scope + identity, got {:?}",
1038 f.cache_key()
1039 );
1040
1041 reset_thread_locals();
1042 }
1043
1044 #[test]
1045 fn as_of_rejects_non_versioned_user_collection() {
1046 reset_thread_locals();
1047 let rt = fresh_runtime();
1048
1049 // `not_versioned` is a plain user collection — the frame
1050 // builder must reject AS OF until the caller opts in via
1051 // `vcs.set_versioned`.
1052 let err = match StatementExecutionFrame::build(
1053 &rt,
1054 "SELECT * FROM not_versioned AS OF COMMIT 'deadbeef'",
1055 ) {
1056 Err(e) => e,
1057 Ok(_) => panic!("AS OF on non-versioned user collection rejected"),
1058 };
1059
1060 let msg = format!("{err}");
1061 assert!(
1062 msg.contains("AS OF requires a versioned collection"),
1063 "expected AS OF rejection, got: {msg}"
1064 );
1065 }
1066
1067 /// End-to-end proof that the SELECT path consumes a `ReadFrame`.
1068 ///
1069 /// Sets a tenant + identity via the public thread-local API the
1070 /// runtime uses for ambient scope, drives a real `SELECT` through
1071 /// `execute_query`, then inspects the result cache that the SELECT
1072 /// path populates via `frame.cache_key()`. The key only carries
1073 /// the tenant + identity *because* it was built through the frame —
1074 /// reverting the wiring to inline `current_tenant()` /
1075 /// `current_auth_identity()` reads would still pass this test, but
1076 /// dropping the frame entirely (so the SELECT path stopped touching
1077 /// `cache_key`) would break it.
1078 #[test]
1079 fn select_path_routes_through_frame_cache_key() {
1080 reset_thread_locals();
1081 set_current_tenant("acme".to_string());
1082 set_current_auth_identity("alice".to_string(), Role::Read);
1083
1084 let rt = fresh_runtime();
1085 let result = rt
1086 .execute_query("SELECT 1")
1087 .expect("SELECT 1 executes under tenant=acme/identity=alice");
1088 assert_eq!(result.statement_type, "select");
1089
1090 // The textual SELECT path builds a frame and
1091 // writes its result through `frame.cache_key()`. That key folds
1092 // tenant + identity in via `result_cache_key`, so finding "acme"
1093 // and "alice" inside any cached key proves the frame was the
1094 // seam used.
1095 let cache = rt.inner.result_cache.read();
1096 let any_keyed_with_scope = cache
1097 .0
1098 .keys()
1099 .any(|k| k.contains("acme") && k.contains("alice"));
1100 assert!(
1101 any_keyed_with_scope,
1102 "expected at least one result-cache key carrying tenant+identity, \
1103 got keys: {:?}",
1104 cache.0.keys().collect::<Vec<_>>()
1105 );
1106
1107 reset_thread_locals();
1108 }
1109
1110 /// A SELECT that calls a volatile builtin (here:
1111 /// `pg_advisory_unlock`, the volatile token the runtime currently
1112 /// recognises in `query_has_volatile_builtin`) must NOT populate
1113 /// the result cache. Any caller hitting the cache after this would
1114 /// see a stale answer for an inherently-volatile query, so the
1115 /// SELECT path gates writes through `frame.should_cache_result()`.
1116 ///
1117 /// Deletion test: removing `ReadFrame::should_cache_result`, or
1118 /// reverting the SELECT path to skip its safety gate, would let
1119 /// the result cache silently absorb this statement and break the
1120 /// assertion below.
1121 #[test]
1122 fn volatile_select_does_not_populate_result_cache() {
1123 reset_thread_locals();
1124 let rt = fresh_runtime();
1125
1126 // Frame-level invariant: the volatile-builtin signal collapses
1127 // `should_cache_result` to false even for an autocommit /
1128 // out-of-tx connection.
1129 let frame =
1130 StatementExecutionFrame::build(&rt, "SELECT pg_advisory_unlock(1)").expect("frame");
1131 let f: &dyn ReadFrame = &frame;
1132 assert!(
1133 !f.should_cache_result(),
1134 "volatile builtin must disable result-cache safety"
1135 );
1136
1137 // End-to-end: drive the volatile SELECT through `execute_query`
1138 // and confirm no entry was stamped under its cache key. Other
1139 // entries from prior tests sharing the binary may exist, so we
1140 // assert specifically on this query's key.
1141 let _ = rt
1142 .execute_query("SELECT pg_advisory_unlock(1)")
1143 .expect("volatile SELECT executes");
1144 let cache = rt.inner.result_cache.read();
1145 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1146 assert!(
1147 !cache.0.contains_key(&key),
1148 "volatile SELECT must not populate result cache, found key {key:?} in {:?}",
1149 cache.0.keys().collect::<Vec<_>>()
1150 );
1151
1152 reset_thread_locals();
1153 }
1154
1155 #[test]
1156 fn blob_cache_backend_populates_blob_path_without_legacy_write() {
1157 reset_thread_locals();
1158 let rt = fresh_runtime();
1159 rt.inner
1160 .db
1161 .store()
1162 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1163
1164 let result = rt.execute_query("SELECT 1").expect("SELECT 1 executes");
1165 assert_eq!(result.statement_type, "select");
1166
1167 let key = result_cache_key("SELECT 1");
1168 assert!(
1169 rt.inner
1170 .result_blob_cache
1171 .get("runtime.result_cache", &key)
1172 .is_some(),
1173 "blob backend should stamp the Blob Cache path"
1174 );
1175 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1176 assert!(
1177 !rt.inner.result_cache.read().0.contains_key(&key),
1178 "blob backend should not write the legacy map"
1179 );
1180 }
1181
1182 #[test]
1183 fn blob_cache_backend_keeps_volatile_select_out_of_blob_path() {
1184 reset_thread_locals();
1185 let rt = fresh_runtime();
1186 rt.inner
1187 .db
1188 .store()
1189 .set_config_tree("runtime.result_cache.backend", &crate::json!("blob_cache"));
1190
1191 let _ = rt
1192 .execute_query("SELECT pg_advisory_unlock(1)")
1193 .expect("volatile SELECT executes");
1194 let key = result_cache_key("SELECT pg_advisory_unlock(1)");
1195 assert!(
1196 rt.inner
1197 .result_blob_cache
1198 .get("runtime.result_cache", &key)
1199 .is_none(),
1200 "volatile SELECT must not populate blob result cache"
1201 );
1202 assert!(!rt.inner.result_blob_entries.read().0.contains_key(&key));
1203 }
1204
1205 #[test]
1206 fn shadow_backend_dual_writes_and_reports_no_divergence_on_equal_results() {
1207 reset_thread_locals();
1208 let rt = fresh_runtime();
1209 rt.inner
1210 .db
1211 .store()
1212 .set_config_tree("runtime.result_cache.backend", &crate::json!("shadow"));
1213
1214 let first = rt.execute_query("SELECT 1").expect("first SELECT");
1215 let second = rt.execute_query("SELECT 1").expect("cached SELECT");
1216 assert_eq!(first.result.len(), second.result.len());
1217
1218 let key = result_cache_key("SELECT 1");
1219 assert!(rt.inner.result_cache.read().0.contains_key(&key));
1220 assert!(rt.inner.result_blob_entries.read().0.contains_key(&key));
1221 assert_eq!(rt.result_cache_shadow_divergences(), 0);
1222 assert_eq!(
1223 crate::runtime::METRIC_CACHE_SHADOW_DIVERGENCE_TOTAL,
1224 "cache_shadow_divergence_total"
1225 );
1226 }
1227
1228 #[test]
1229 fn as_of_on_red_collection_records_floor() {
1230 reset_thread_locals();
1231 let rt = fresh_runtime();
1232
1233 // `red_*` collections always allow AS OF. The frame should
1234 // resolve to a concrete xid and surface it via the Interface.
1235 let frame =
1236 StatementExecutionFrame::build(&rt, "SELECT * FROM red_commits AS OF SNAPSHOT 1")
1237 .expect("AS OF SNAPSHOT 1 on red_commits resolves");
1238
1239 let f: &dyn ReadFrame = &frame;
1240 assert_eq!(
1241 f.as_of_floor(),
1242 Some(1),
1243 "AS OF SNAPSHOT 1 records xid=1 as the floor"
1244 );
1245 assert_eq!(f.snapshot().xid, 1);
1246 assert!(
1247 f.snapshot().in_progress.is_empty(),
1248 "historical reads have no in-progress set"
1249 );
1250 }
1251
1252 /// The frame classifies common SQL prefixes into the coarse
1253 /// `Privilege` / `LockIntent` buckets at build time. This test
1254 /// pins the mapping so a regression that silently re-routes
1255 /// (e.g. INSERT classified as Read) surfaces here, not at a
1256 /// downstream privilege gate.
1257 #[test]
1258 fn frame_classifies_privilege_and_lock_intent_from_prefix() {
1259 reset_thread_locals();
1260 let rt = fresh_runtime();
1261
1262 let cases = [
1263 ("SELECT 1", Privilege::Read, LockIntent::Shared),
1264 ("LIST KV settings", Privilege::Read, LockIntent::Shared),
1265 (
1266 "KV GET settings.feature",
1267 Privilege::Read,
1268 LockIntent::Shared,
1269 ),
1270 ("VAULT LIST secrets", Privilege::Read, LockIntent::Shared),
1271 (
1272 "INSERT INTO t (id) VALUES (1)",
1273 Privilege::Write,
1274 LockIntent::Exclusive,
1275 ),
1276 (
1277 "KV PUT settings.feature = 'on'",
1278 Privilege::Write,
1279 LockIntent::Exclusive,
1280 ),
1281 (
1282 "VAULT PUT secrets.api = 'x'",
1283 Privilege::Write,
1284 LockIntent::Exclusive,
1285 ),
1286 (
1287 "UPDATE t SET x = 1 WHERE id = 1",
1288 Privilege::Write,
1289 LockIntent::Exclusive,
1290 ),
1291 (
1292 "DELETE FROM t WHERE id = 1",
1293 Privilege::Write,
1294 LockIntent::Exclusive,
1295 ),
1296 (
1297 "CREATE TABLE foo (id INT)",
1298 Privilege::Write,
1299 LockIntent::Exclusive,
1300 ),
1301 ("BEGIN", Privilege::None, LockIntent::None),
1302 ("COMMIT", Privilege::None, LockIntent::None),
1303 ("SET timezone = 'UTC'", Privilege::None, LockIntent::None),
1304 ];
1305
1306 for (q, want_priv, want_lock) in cases {
1307 let frame = StatementExecutionFrame::build(&rt, q)
1308 .unwrap_or_else(|e| panic!("frame builds for {q:?}: {e}"));
1309 let f: &dyn ReadFrame = &frame;
1310 assert_eq!(f.required_privilege(), want_priv, "privilege for {q:?}");
1311 assert_eq!(f.lock_intent(), want_lock, "lock intent for {q:?}");
1312 }
1313 }
1314
1315 /// Deletion-test for `ReadFrame::required_privilege`: a SELECT
1316 /// driven through `execute_query` under an identity whose role
1317 /// doesn't satisfy the frame's coarse `Read` privilege gets
1318 /// denied with the frame's signal.
1319 ///
1320 /// We test the gate by classifying an INSERT (which the frame
1321 /// reports as `Privilege::Write`) under `Role::Read` — the only
1322 /// pair the legacy fallback would also reject, but here the
1323 /// rejection comes through `frame.check_query_privilege` BEFORE
1324 /// the parsed-expression walker runs. Removing
1325 /// `required_privilege` (or the `is_satisfied_by` consult inside
1326 /// `check_query_privilege`) would force the deny path back to the
1327 /// inline `RedDBRuntime::check_query_privilege` walker — but the
1328 /// auth_store gate up there is bypassed when no auth_store is
1329 /// wired (embedded test mode), so this test would FLIP from
1330 /// denied to permitted and break the assertion below.
1331 #[test]
1332 fn insert_under_read_role_denied_via_frame_privilege() {
1333 reset_thread_locals();
1334 set_current_auth_identity("alice".to_string(), Role::Read);
1335
1336 let rt = fresh_runtime();
1337 // Bypass parser by reaching into the frame directly: the
1338 // frame derives privilege from the SQL prefix without
1339 // needing an auth_store wired up. Driving end-to-end via
1340 // `execute_query` would also reject (no table `t`), but for
1341 // a different reason — we want to pin the privilege seam.
1342 let frame = StatementExecutionFrame::build(&rt, "INSERT INTO t (id) VALUES (1)")
1343 .expect("frame builds for INSERT");
1344 let f: &dyn ReadFrame = &frame;
1345 assert_eq!(
1346 f.required_privilege(),
1347 Privilege::Write,
1348 "INSERT classified as Write"
1349 );
1350 let id = f.identity().expect("identity captured");
1351 assert!(
1352 !f.required_privilege().is_satisfied_by(id.1),
1353 "Role::Read does not satisfy Privilege::Write — frame must deny"
1354 );
1355
1356 // End-to-end: the frame's `check_query_privilege` sees the
1357 // (Read role, Write privilege) mismatch and denies before
1358 // dispatch. We drive a synthetic `QueryExpr::Table` because
1359 // the SELECT/INSERT parser would happen to also fail, and we
1360 // want the failure to come from the privilege seam.
1361 use crate::storage::query::ast::{QueryExpr, TableQuery};
1362 let expr = QueryExpr::Table(TableQuery::new("t"));
1363 let err = frame
1364 .check_query_privilege(&rt, &expr)
1365 .expect_err("denied via frame's coarse privilege gate");
1366 let msg = format!("{err}");
1367 assert!(
1368 msg.contains("permission denied") && msg.contains("Write"),
1369 "expected frame-level Write deny, got: {msg}"
1370 );
1371
1372 reset_thread_locals();
1373 }
1374
1375 /// End-to-end proof that the frame-owned row-buffer arena (#885) is
1376 /// wired into the SELECT path and produces observable results
1377 /// byte-identical to the per-request-allocation baseline.
1378 ///
1379 /// A table with more rows than the streaming high-water mark
1380 /// (`DEFAULT_HIGH_WATER_MARK`) forces the `execute_runtime_table_query_in`
1381 /// path to assemble many chunks, each leasing/recycling the frame
1382 /// arena's single chunk buffer. Driving it through `execute_query`
1383 /// (which builds a `StatementExecutionFrame` and lends its arena)
1384 /// must return every inserted row, in order — exactly what the
1385 /// allocate-per-chunk path returned. A bug in the arena wiring
1386 /// (dropped rows, bled rows, mis-ordering) would surface here.
1387 #[test]
1388 fn large_select_through_frame_arena_returns_all_rows_in_order() {
1389 reset_thread_locals();
1390 let rt = fresh_runtime();
1391 rt.execute_query("CREATE TABLE big (id INT)")
1392 .expect("create table");
1393
1394 // > DEFAULT_HIGH_WATER_MARK (1024) rows so the streaming channel
1395 // spans multiple chunks and the arena buffer is reused.
1396 const N: usize = 2_500;
1397 for start in (0..N).step_by(250) {
1398 let end = (start + 250).min(N);
1399 let values = (start..end)
1400 .map(|i| format!("({i})"))
1401 .collect::<Vec<_>>()
1402 .join(", ");
1403 rt.execute_query(&format!("INSERT INTO big (id) VALUES {values}"))
1404 .unwrap_or_else(|err| panic!("insert rows {start}..{end}: {err:?}"));
1405 }
1406
1407 let result = rt
1408 .execute_query("SELECT id FROM big ORDER BY id")
1409 .expect("large SELECT executes through the frame arena path");
1410 assert_eq!(result.statement_type, "select");
1411 assert_eq!(
1412 result.result.records.len(),
1413 N,
1414 "every inserted row streams back through the arena-backed channel"
1415 );
1416 for (i, record) in result.result.records.iter().enumerate() {
1417 assert_eq!(
1418 record.get("id"),
1419 Some(&crate::storage::schema::Value::Integer(i as i64)),
1420 "row {i} is byte-identical to the per-request-allocation baseline"
1421 );
1422 }
1423
1424 reset_thread_locals();
1425 }
1426
1427 /// Transport adapters may decode their wire-specific parameter value
1428 /// shapes, but SQL parsing/binding must stay behind the runtime's
1429 /// statement entrypoint. This pins the deeper seam introduced for
1430 /// parameterized query execution: HTTP, JSON-RPC, RedWire, PG wire,
1431 /// and gRPC all call `RedDBRuntime::execute_query_with_params`, which
1432 /// installs a real `StatementExecutionFrame` before dispatch.
1433 #[test]
1434 fn parameterized_transport_adapters_delegate_binding_to_runtime() {
1435 let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
1436 let adapters = [
1437 "src/server/handlers_query.rs",
1438 "src/rpc_stdio.rs",
1439 "src/wire/redwire/session.rs",
1440 "src/wire/postgres/server.rs",
1441 "src/grpc.rs",
1442 ];
1443
1444 for relative in adapters {
1445 let path = manifest_dir.join(relative);
1446 let text = std::fs::read_to_string(&path)
1447 .unwrap_or_else(|err| panic!("read {}: {err}", path.display()));
1448 assert!(
1449 text.contains("execute_query_with_params"),
1450 "{relative} should delegate parameterized query execution to the runtime"
1451 );
1452 assert!(
1453 !text.contains("user_params::bind"),
1454 "{relative} must not bind SQL params in the transport adapter"
1455 );
1456 }
1457 }
1458
1459 /// Deletion-test for `ReadFrame::lock_intent`: a transaction
1460 /// control statement carries `LockIntent::None` and the
1461 /// `acquire_intent_locks` path returns `None` without consulting
1462 /// `intent_lock_modes_for`. Removing the method (or its consult
1463 /// site in `acquire_intent_locks`) would force the lock-mode
1464 /// helper to walk a fabricated parsed expression to reach the
1465 /// same conclusion — but the assertion that no guard is allocated
1466 /// for a `BEGIN` frame would still hold, so we additionally pin
1467 /// the classifier mapping above to make the deletion observable.
1468 #[test]
1469 fn control_statement_skips_intent_locks_via_frame() {
1470 reset_thread_locals();
1471 let rt = fresh_runtime();
1472
1473 let frame = StatementExecutionFrame::build(&rt, "BEGIN").expect("frame builds for BEGIN");
1474 let f: &dyn ReadFrame = &frame;
1475 assert_eq!(f.lock_intent(), LockIntent::None);
1476
1477 // Drive `acquire_intent_locks` against a fabricated SELECT
1478 // expression that WOULD normally yield `(IS, IS)`; the frame's
1479 // `lock_intent() == None` short-circuit must still suppress
1480 // the guard.
1481 use crate::storage::query::ast::{QueryExpr, TableQuery};
1482 let expr = QueryExpr::Table(TableQuery::new("t"));
1483 let guard = frame.acquire_intent_locks(&rt, &expr);
1484 assert!(
1485 guard.is_none(),
1486 "BEGIN frame's lock_intent=None must short-circuit lock acquisition"
1487 );
1488 }
1489}